binlog

package
v0.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetCurrentBinlogPosition added in v0.2.5

func GetCurrentBinlogPosition(ctx context.Context, client *sqlx.DB) (mysql.Position, error)

GetCurrentBinlogPosition retrieves the current binlog position from MySQL.

Types

type Binlog

type Binlog struct {
	Position mysql.Position `json:"position"`
}

BinlogState holds the current binlog position.

type CDCChange

type CDCChange struct {
	Stream    types.StreamInterface
	Timestamp time.Time
	Position  mysql.Position
	Kind      string
	Schema    string
	Table     string
	Data      map[string]interface{}
}

CDCChange represents a change event captured from the binlog.

type ChangeFilter

type ChangeFilter struct {
	// contains filtered or unexported fields
}

ChangeFilter filters binlog events based on the specified streams.

func NewChangeFilter

func NewChangeFilter(typeConverter func(value interface{}, columnType string) (interface{}, error), streams ...types.StreamInterface) ChangeFilter

NewChangeFilter creates a filter for the given streams.

func (ChangeFilter) FilterRowsEvent

func (f ChangeFilter) FilterRowsEvent(ctx context.Context, e *replication.RowsEvent, ev *replication.BinlogEvent, callback abstract.CDCMsgFn) error

FilterRowsEvent processes RowsEvent and calls the callback for matching streams.

type Config

type Config struct {
	ServerID        uint32
	Flavor          string
	Host            string
	Port            uint16
	User            string
	Password        string
	Charset         string
	VerifyChecksum  bool
	HeartbeatPeriod time.Duration
	InitialWaitTime time.Duration
	SSHClient       *ssh.Client
}

Config holds the configuration for the binlog syncer.

type Connection

type Connection struct {
	CurrentPos mysql.Position // Current binlog position
	ServerID   uint32
	// contains filtered or unexported fields
}

Connection manages the binlog syncer and streamer for multiple streams.

func NewConnection

func NewConnection(_ context.Context, config *Config, pos mysql.Position, streams []types.StreamInterface, typeConverter func(value interface{}, columnType string) (interface{}, error)) (*Connection, error)

NewConnection creates a new binlog connection starting from the given position.

func (*Connection) Cleanup

func (c *Connection) Cleanup()

Cleanup terminates the binlog syncer.

func (*Connection) StreamMessages

func (c *Connection) StreamMessages(ctx context.Context, client *sqlx.DB, callback abstract.CDCMsgFn) error

type OnChange

type OnChange func(change CDCChange) error

OnChange is a callback function type for processing CDC changes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL