abstract

package
v0.3.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

func ReformatCursorValue added in v0.2.7

func ReformatCursorValue(cursorField string, cursorValue any, stream types.StreamInterface) (any, error)

RefomratCursorValue to parse the cursor value to the correct type

func RetryOnBackoff

func RetryOnBackoff(attempts int, sleep time.Duration, f func() error) (err error)

Types

type AbstractDriver

type AbstractDriver struct {
	GlobalConnGroup *utils.CxGroup
	GlobalCtxGroup  *utils.CxGroup
	// contains filtered or unexported fields
}

func NewAbstractDriver

func NewAbstractDriver(ctx context.Context, driver DriverInterface) *AbstractDriver

func (*AbstractDriver) Backfill

func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan string, pool *destination.WriterPool, stream types.StreamInterface) error

func (*AbstractDriver) ClearState added in v0.2.9

func (a *AbstractDriver) ClearState(streams []types.StreamInterface) (*types.State, error)

func (*AbstractDriver) Discover

func (a *AbstractDriver) Discover(ctx context.Context) ([]*types.Stream, error)

func (*AbstractDriver) GetConfigRef

func (a *AbstractDriver) GetConfigRef() Config

func (*AbstractDriver) GetKafkaInterface added in v0.3.0

func (a *AbstractDriver) GetKafkaInterface() (KafkaInterface, bool)

func (*AbstractDriver) Incremental added in v0.1.6

func (a *AbstractDriver) Incremental(ctx context.Context, pool *destination.WriterPool, streams ...types.StreamInterface) error

func (*AbstractDriver) Read

func (a *AbstractDriver) Read(ctx context.Context, pool *destination.WriterPool, backfillStreams, cdcStreams, incrementalStreams []types.StreamInterface) error

func (*AbstractDriver) RunChangeStream

func (a *AbstractDriver) RunChangeStream(ctx context.Context, pool *destination.WriterPool, streams ...types.StreamInterface) error

func (*AbstractDriver) Setup

func (a *AbstractDriver) Setup(ctx context.Context) error

func (*AbstractDriver) SetupState

func (a *AbstractDriver) SetupState(state *types.State)

func (*AbstractDriver) Spec

func (a *AbstractDriver) Spec() any

func (*AbstractDriver) Type added in v0.1.2

func (a *AbstractDriver) Type() string

type BackfillMsgFn

type BackfillMsgFn func(ctx context.Context, message map[string]any) error

type CDCChange

type CDCChange struct {
	Stream    types.StreamInterface
	Timestamp time.Time
	Kind      string
	Data      map[string]interface{}
}

type CDCMsgFn

type CDCMsgFn func(ctx context.Context, message CDCChange) error

type Config

type Config interface {
	Validate() error
}

type DriverInterface

type DriverInterface interface {
	GetConfigRef() Config
	Spec() any
	Type() string
	// specific to test & setup
	Setup(ctx context.Context) error
	SetupState(state *types.State)
	// sync artifacts
	MaxConnections() int
	MaxRetries() int
	// specific to discover
	GetStreamNames(ctx context.Context) ([]string, error)
	ProduceSchema(ctx context.Context, stream string) (*types.Stream, error)
	// specific to backfill
	GetOrSplitChunks(ctx context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error)
	ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, processFn BackfillMsgFn) error
	//incremental specific
	FetchMaxCursorValues(ctx context.Context, stream types.StreamInterface) (any, any, error)
	StreamIncrementalChanges(ctx context.Context, stream types.StreamInterface, cb BackfillMsgFn) error
	// specific to cdc
	CDCSupported() bool
	PreCDC(ctx context.Context, streams []types.StreamInterface) error // to init state
	StreamChanges(ctx context.Context, stream types.StreamInterface, processFn CDCMsgFn) error
	PostCDC(ctx context.Context, stream types.StreamInterface, success bool, readerID string) error // to save state
}

type KafkaInterface added in v0.3.0

type KafkaInterface interface {
	DriverInterface
	GetReaderIDs() []string
	PartitionStreamChanges(ctx context.Context, readerID string, processFn CDCMsgFn) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL