abstract

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

func RetryOnBackoff

func RetryOnBackoff(attempts int, sleep time.Duration, f func() error) (err error)

Types

type AbstractDriver

type AbstractDriver struct {
	GlobalConnGroup *utils.CxGroup
	GlobalCtxGroup  *utils.CxGroup
	// contains filtered or unexported fields
}

func NewAbstractDriver

func NewAbstractDriver(ctx context.Context, driver DriverInterface) *AbstractDriver

func (*AbstractDriver) Backfill

func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan string, pool *destination.WriterPool, stream types.StreamInterface) error

func (*AbstractDriver) Discover

func (a *AbstractDriver) Discover(ctx context.Context) ([]*types.Stream, error)

func (*AbstractDriver) GetConfigRef

func (a *AbstractDriver) GetConfigRef() Config

func (*AbstractDriver) Read

func (a *AbstractDriver) Read(ctx context.Context, pool *destination.WriterPool, standardStreams, cdcStreams []types.StreamInterface) error

Read handles different sync modes for data retrieval

func (*AbstractDriver) RunChangeStream

func (a *AbstractDriver) RunChangeStream(ctx context.Context, pool *destination.WriterPool, streams ...types.StreamInterface) error

func (*AbstractDriver) Setup

func (a *AbstractDriver) Setup(ctx context.Context) error

func (*AbstractDriver) SetupState

func (a *AbstractDriver) SetupState(state *types.State)

func (*AbstractDriver) Spec

func (a *AbstractDriver) Spec() any

func (*AbstractDriver) TestDiscover

func (a *AbstractDriver) TestDiscover(t *testing.T, conn interface{}, execQuery ExecuteQuery)

TestDiscover tests the discovery of tables

func (*AbstractDriver) TestRead

func (a *AbstractDriver) TestRead(t *testing.T, conn interface{}, execQuery ExecuteQuery)

TestRead tests full refresh and CDC read operations

func (*AbstractDriver) TestSetup

func (a *AbstractDriver) TestSetup(t *testing.T)

TestSetup tests the driver setup and connection check

type BackfillMsgFn

type BackfillMsgFn func(message map[string]any) error

type CDCChange

type CDCChange struct {
	Stream    types.StreamInterface
	Timestamp typeutils.Time
	Kind      string
	Data      map[string]interface{}
}

type CDCMsgFn

type CDCMsgFn func(message CDCChange) error

type Config

type Config interface {
	Validate() error
}

type DriverInterface

type DriverInterface interface {
	GetConfigRef() Config
	Spec() any
	Type() string
	// specific to test & setup
	Setup(ctx context.Context) error
	SetupState(state *types.State)
	// sync artifacts
	MaxConnections() int
	MaxRetries() int
	// specific to discover
	GetStreamNames(ctx context.Context) ([]string, error)
	ProduceSchema(ctx context.Context, stream string) (*types.Stream, error)
	// specific to backfill
	GetOrSplitChunks(ctx context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error)
	ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, processFn BackfillMsgFn) error
	// specific to cdc
	CDCSupported() bool
	PreCDC(ctx context.Context, streams []types.StreamInterface) error // to init state
	StreamChanges(ctx context.Context, stream types.StreamInterface, processFn CDCMsgFn) error
	PostCDC(ctx context.Context, stream types.StreamInterface, success bool) error // to save state
}

type ExecuteQuery

type ExecuteQuery func(ctx context.Context, t *testing.T, conn interface{}, tableName string, operation string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL