abstract

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

func ReformatCursorValue added in v0.2.7

func ReformatCursorValue(cursorField string, cursorValue any, stream types.StreamInterface) (any, error)

RefomratCursorValue to parse the cursor value to the correct type

Types

type AbstractDriver

type AbstractDriver struct {
	GlobalConnGroup *utils.CxGroup
	GlobalCtxGroup  *utils.CxGroup
	// contains filtered or unexported fields
}

func NewAbstractDriver

func NewAbstractDriver(ctx context.Context, driver DriverInterface) *AbstractDriver

func (*AbstractDriver) Backfill

func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams chan string, pool *destination.WriterPool, stream types.StreamInterface) error

func (*AbstractDriver) ClearState added in v0.2.9

func (a *AbstractDriver) ClearState(streams []types.StreamInterface) (*types.State, error)

func (*AbstractDriver) Discover

func (a *AbstractDriver) Discover(ctx context.Context, maxDiscoverThreads int) ([]*types.Stream, error)

func (*AbstractDriver) FormatCursorValue added in v0.3.14

func (a *AbstractDriver) FormatCursorValue(cursorValue any) any

FormatCursorValue is used to make time format and object id format consistent to be saved in state

func (*AbstractDriver) GetConfigRef

func (a *AbstractDriver) GetConfigRef() Config

func (*AbstractDriver) Incremental added in v0.1.6

func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.WriterPool, streams ...types.StreamInterface) error

func (*AbstractDriver) Read

func (a *AbstractDriver) Read(ctx context.Context, pool *destination.WriterPool, backfillStreams, cdcStreams, incrementalStreams []types.StreamInterface) error

func (*AbstractDriver) RunChangeStream

func (a *AbstractDriver) RunChangeStream(mainCtx context.Context, pool *destination.WriterPool, streams ...types.StreamInterface) error

RunChangeStream orchestrates the CDC sync process: 1. Pre-CDC: Initialize driver-specific CDC state 2. Backfill: Load historical data for streams that need it 3. CDC: Start change data capture based on execution mode:

  • Sequential: Process streams one at a time after all backfills complete
  • Parallel: Process all streams simultaneously after all backfills complete
  • Concurrent: Start each stream's CDC immediately after its backfill completes (can overlap)

func (*AbstractDriver) Setup

func (a *AbstractDriver) Setup(ctx context.Context) error

func (*AbstractDriver) SetupState

func (a *AbstractDriver) SetupState(state *types.State)

func (*AbstractDriver) Spec

func (a *AbstractDriver) Spec() any

func (*AbstractDriver) Type added in v0.1.2

func (a *AbstractDriver) Type() string

type BackfillMsgFn

type BackfillMsgFn func(ctx context.Context, message map[string]any) error

type CDCChange

type CDCChange struct {
	Stream       types.StreamInterface
	Timestamp    time.Time
	Kind         string
	Data         map[string]any
	ExtraColumns map[string]any // Driver-specific CDC metadata (e.g., LSN, binlog position, resume token)
}

type CDCMsgFn

type CDCMsgFn func(ctx context.Context, message CDCChange) error

type Config

type Config interface {
	Validate() error
}

type DriverInterface

type DriverInterface interface {
	GetConfigRef() Config
	Spec() any
	Type() string
	// specific to test & setup
	Setup(ctx context.Context) error
	SetupState(state *types.State)
	// sync artifacts
	MaxConnections() int
	MaxRetries() int
	// specific to discover
	GetStreamNames(ctx context.Context) ([]string, error)
	ProduceSchema(ctx context.Context, stream string) (*types.Stream, error)
	// specific to backfill
	GetOrSplitChunks(ctx context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error)
	ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, processFn BackfillMsgFn) error
	//incremental specific
	FetchMaxCursorValues(ctx context.Context, stream types.StreamInterface) (any, any, error)
	StreamIncrementalChanges(ctx context.Context, stream types.StreamInterface, cb BackfillMsgFn) error
	// specific to cdc
	CDCSupported() bool
	ChangeStreamConfig() (sequential bool, parallel bool, concurrent bool)
	PreCDC(ctx context.Context, streams []types.StreamInterface) error // to init state
	StreamChanges(ctx context.Context, identifier int, processFn CDCMsgFn) error
	PostCDC(ctx context.Context, identifier int) error // to save state
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL