offset

package
v0.0.0-...-77b25ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStoreClosed = errors.New("offset store is closed")

ErrStoreClosed is returned when operating on a closed store

Functions

This section is empty.

Types

type Offset

type Offset struct {
	LastLSN   string    `json:"last_lsn"` // Last LSN from fetched data
	NextLSN   string    `json:"next_lsn"` // incrementLSN(last_lsn) - cached next start point
	UpdatedAt time.Time `json:"updated_at"`
}

Offset stores the LSN position for each table last_lsn: last LSN from fetched data next_lsn: incrementLSN(last_lsn) - pre-computed next start point (cached, not for comparison)

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore manages LSN offsets using a dedicated SQLite database. This DB is separate from the CDC store DB for transactional safety. Writes are immediate (no buffering), but Flush() is exposed for interface compatibility.

func New

func New(ctx context.Context, dbPath string) (*SQLiteStore, error)

New creates a new standalone SQLite offset store with its own DB file. It runs migrations separately from the CDC store DB.

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the store and the underlying database connection.

func (*SQLiteStore) Flush

func (s *SQLiteStore) Flush() error

Flush ensures all buffered writes are committed to the database. For the offset store, writes are immediate, but this method is kept for interface compatibility.

func (*SQLiteStore) Get

func (s *SQLiteStore) Get(table string) (Offset, error)

Get returns the LSN for a table

func (*SQLiteStore) GetAll

func (s *SQLiteStore) GetAll() (map[string]Offset, error)

GetAll returns all offsets

func (*SQLiteStore) Load

func (s *SQLiteStore) Load() error

Load is a no-op: data is already in the DB.

func (*SQLiteStore) Save

func (s *SQLiteStore) Save() error

Save is a no-op: writes are immediate.

func (*SQLiteStore) Set

func (s *SQLiteStore) Set(table string, lastLSN string, nextLSN string) error

Set updates the LSN for a table lastLSN: last LSN from fetched data nextLSN: incrementLSN(lastLSN) - cached next start point

type StoreInterface

type StoreInterface interface {
	Load() error
	Save() error
	Flush() error
	Get(table string) (Offset, error)
	Set(table string, lastLSN string, nextLSN string) error
	GetAll() (map[string]Offset, error)
}

StoreInterface defines the interface for offset storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL