logrepl

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Cleanup added in v0.7.1

func Cleanup(ctx context.Context, c CleanupConfig) error

Cleanup drops the provided replication slot and publication. It will terminate any backends consuming the replication slot before deletion.

Types

type CDCConfig added in v0.7.1

type CDCConfig struct {
	LSN             pglogrepl.LSN
	SlotName        string
	PublicationName string
	Tables          []string
	TableKeys       map[string]string
	WithAvroSchema  bool
	// BatchSize is the maximum size of a batch that will be read from the DB
	// in one go and processed by the CDCHandler.
	BatchSize int
}

CDCConfig holds configuration values for CDCIterator.

type CDCHandler

type CDCHandler struct {
	// contains filtered or unexported fields
}

CDCHandler is responsible for handling logical replication messages, converting them to a record and sending them to a channel.

func NewCDCHandler

func NewCDCHandler(
	ctx context.Context,
	rs *internal.RelationSet,
	tableKeys map[string]string,
	out chan<- []opencdc.Record,
	withAvroSchema bool,
	batchSize int,
	flushInterval time.Duration,
) *CDCHandler

func (*CDCHandler) Handle

Handle is the handler function that receives all logical replication messages. Returns non-zero LSN when a record was emitted for the message.

type CDCIterator

type CDCIterator struct {
	// contains filtered or unexported fields
}

CDCIterator asynchronously listens for events from the logical replication slot and returns them to the caller through NextN.

func NewCDCIterator

func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCIterator, error)

NewCDCIterator initializes logical replication by creating the publication and subscription manager.

func (*CDCIterator) Ack

func (i *CDCIterator) Ack(_ context.Context, sdkPos opencdc.Position) error

Ack forwards the acknowledgment to the subscription.

func (*CDCIterator) NextN added in v0.12.0

func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error)

NextN returns up to n records from the internal channel with records. NextN is allowed to block until either at least one record is available or the context gets canceled.

func (*CDCIterator) StartSubscriber added in v0.7.1

func (i *CDCIterator) StartSubscriber(ctx context.Context) error

StartSubscriber starts the logical replication service in the background. Blocks until the subscription becomes ready.

func (*CDCIterator) TXSnapshotID added in v0.7.1

func (i *CDCIterator) TXSnapshotID() string

TXSnapshotID returns the transaction snapshot which is received when the replication slot is created. The value can be empty, when the iterator is resuming.

func (*CDCIterator) Teardown

func (i *CDCIterator) Teardown(ctx context.Context) error

Teardown stops the CDC subscription and blocks until the subscription is done or the context gets canceled. If the subscription stopped with an unexpected error, the error is returned.

type CleanupConfig added in v0.7.1

type CleanupConfig struct {
	URL             string
	SlotName        string
	PublicationName string
}

type CombinedIterator added in v0.7.1

type CombinedIterator struct {
	// contains filtered or unexported fields
}

func NewCombinedIterator added in v0.7.1

func NewCombinedIterator(ctx context.Context, pool *pgxpool.Pool, conf Config) (*CombinedIterator, error)

NewCombinedIterator will initialize and start the Snapshot and CDC iterators. Failure to parse the position or validate the config will return an error.

func (*CombinedIterator) Ack added in v0.7.1

func (*CombinedIterator) NextN added in v0.12.0

func (c *CombinedIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error)

NextN retrieves up to n records from the active iterator. If the end of the snapshot is reached during this call, it will switch to the CDC iterator and continue retrieving records from there.

func (*CombinedIterator) Teardown added in v0.7.1

func (c *CombinedIterator) Teardown(ctx context.Context) error

Teardown will stop and teardown the CDC and Snapshot iterators.

type Config

type Config struct {
	Position        opencdc.Position
	SlotName        string
	PublicationName string
	Tables          []string
	TableKeys       map[string]string
	WithSnapshot    bool
	WithAvroSchema  bool
	BatchSize       int
}

func (Config) Validate added in v0.7.1

func (c Config) Validate() error

Validate performs validation tasks on the config.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL