Documentation
¶
Index ¶
- Variables
- func ReformatCursorValue(cursorField string, cursorValue any, stream types.StreamInterface) (any, error)
- type AbstractDriver
- func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams chan string, ...) error
- func (a *AbstractDriver) ClearState(streams []types.StreamInterface) (*types.State, error)
- func (a *AbstractDriver) Discover(ctx context.Context, maxDiscoverThreads int) ([]*types.Stream, error)
- func (a *AbstractDriver) FormatCursorValue(cursorValue any) any
- func (a *AbstractDriver) GetConfigRef() Config
- func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.WriterPool, ...) error
- func (a *AbstractDriver) Read(ctx context.Context, pool *destination.WriterPool, ...) error
- func (a *AbstractDriver) RunChangeStream(mainCtx context.Context, pool *destination.WriterPool, ...) error
- func (a *AbstractDriver) Setup(ctx context.Context) error
- func (a *AbstractDriver) SetupState(state *types.State)
- func (a *AbstractDriver) Spec() any
- func (a *AbstractDriver) Type() string
- type BackfillMsgFn
- type CDCChange
- type CDCMsgFn
- type Config
- type DriverInterface
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultColumns = map[string]types.DataType{ constants.OlakeID: types.String, constants.OlakeTimestamp: types.TimestampMicro, constants.OpType: types.String, constants.CdcTimestamp: types.TimestampMicro, }
Functions ¶
func ReformatCursorValue ¶ added in v0.2.7
func ReformatCursorValue(cursorField string, cursorValue any, stream types.StreamInterface) (any, error)
RefomratCursorValue to parse the cursor value to the correct type
Types ¶
type AbstractDriver ¶
type AbstractDriver struct {
GlobalConnGroup *utils.CxGroup
GlobalCtxGroup *utils.CxGroup
// contains filtered or unexported fields
}
func NewAbstractDriver ¶
func NewAbstractDriver(ctx context.Context, driver DriverInterface) *AbstractDriver
func (*AbstractDriver) Backfill ¶
func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams chan string, pool *destination.WriterPool, stream types.StreamInterface) error
func (*AbstractDriver) ClearState ¶ added in v0.2.9
func (a *AbstractDriver) ClearState(streams []types.StreamInterface) (*types.State, error)
func (*AbstractDriver) FormatCursorValue ¶ added in v0.3.14
func (a *AbstractDriver) FormatCursorValue(cursorValue any) any
FormatCursorValue is used to make time format and object id format consistent to be saved in state
func (*AbstractDriver) GetConfigRef ¶
func (a *AbstractDriver) GetConfigRef() Config
func (*AbstractDriver) Incremental ¶ added in v0.1.6
func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.WriterPool, streams ...types.StreamInterface) error
func (*AbstractDriver) Read ¶
func (a *AbstractDriver) Read(ctx context.Context, pool *destination.WriterPool, backfillStreams, cdcStreams, incrementalStreams []types.StreamInterface) error
func (*AbstractDriver) RunChangeStream ¶
func (a *AbstractDriver) RunChangeStream(mainCtx context.Context, pool *destination.WriterPool, streams ...types.StreamInterface) error
RunChangeStream orchestrates the CDC sync process: 1. Pre-CDC: Initialize driver-specific CDC state 2. Backfill: Load historical data for streams that need it 3. CDC: Start change data capture based on execution mode:
- Sequential: Process streams one at a time after all backfills complete
- Parallel: Process all streams simultaneously after all backfills complete
- Concurrent: Start each stream's CDC immediately after its backfill completes (can overlap)
func (*AbstractDriver) SetupState ¶
func (a *AbstractDriver) SetupState(state *types.State)
func (*AbstractDriver) Spec ¶
func (a *AbstractDriver) Spec() any
func (*AbstractDriver) Type ¶ added in v0.1.2
func (a *AbstractDriver) Type() string
type DriverInterface ¶
type DriverInterface interface {
GetConfigRef() Config
Spec() any
Type() string
// specific to test & setup
Setup(ctx context.Context) error
SetupState(state *types.State)
// sync artifacts
MaxConnections() int
MaxRetries() int
// specific to discover
GetStreamNames(ctx context.Context) ([]string, error)
ProduceSchema(ctx context.Context, stream string) (*types.Stream, error)
// specific to backfill
GetOrSplitChunks(ctx context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error)
ChunkIterator(ctx context.Context, stream types.StreamInterface, chunk types.Chunk, processFn BackfillMsgFn) error
//incremental specific
FetchMaxCursorValues(ctx context.Context, stream types.StreamInterface) (any, any, error)
StreamIncrementalChanges(ctx context.Context, stream types.StreamInterface, cb BackfillMsgFn) error
// specific to cdc
CDCSupported() bool
ChangeStreamConfig() (sequential bool, parallel bool, concurrent bool)
PreCDC(ctx context.Context, streams []types.StreamInterface) error // to init state
StreamChanges(ctx context.Context, identifier int, processFn CDCMsgFn) error
PostCDC(ctx context.Context, identifier int) error // to save state
}
Click to show internal directories.
Click to hide internal directories.