destination

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var RegisteredWriters = map[types.DestinationType]NewFunc{}

Functions

func ClearDestination added in v0.2.9

func ClearDestination(ctx context.Context, config *types.WriterConfig, dropStreams []types.StreamInterface) error

Types

type CloseFunction

type CloseFunction func()

type Config

type Config interface {
	Validate() error
}

type FlattenFunction

type FlattenFunction = func(record types.Record) (types.Record, error)

type InsertFunction

type InsertFunction func(record types.RawRecord) (err error)

type NewFunc

type NewFunc func() Writer

type Options

type Options struct {
	Identifier string
	Number     int64
	Backfill   bool
	ThreadID   string
}

type Stats added in v0.2.0

type Stats struct {
	TotalRecordsToSync atomic.Int64 // total record that are required to sync
	ReadCount          atomic.Int64 // records that got read
	ThreadCount        atomic.Int64 // total number of writer threads
}

type ThreadOptions

type ThreadOptions func(opt *Options)

func WithBackfill

func WithBackfill(backfill bool) ThreadOptions

func WithIdentifier

func WithIdentifier(identifier string) ThreadOptions

func WithNumber

func WithNumber(number int64) ThreadOptions

func WithThreadID added in v0.2.0

func WithThreadID(threadID string) ThreadOptions

type Write

type Write = func(ctx context.Context, channel <-chan types.Record) error

type Writer

type Writer interface {
	GetConfigRef() Config
	Spec() any
	Type() string
	// Sets up connections and perform checks; doesn't load Streams
	//
	// Note: Check shouldn't be called before Setup as they're composed at Connector level
	Check(ctx context.Context) error
	// Setup sets up an Adapter for dedicated use for a stream
	// avoiding the headover for different streams
	Setup(ctx context.Context, stream types.StreamInterface, schema any, opts *Options) (any, error)
	// Write function being used by drivers
	Write(ctx context.Context, record []types.RawRecord) error
	// flatten data and validates thread schema (return true if thread schema is different w.r.t records)
	FlattenAndCleanData(ctx context.Context, records []types.RawRecord) (bool, []types.RawRecord, any, error)
	// EvolveSchema updates the schema based on changes.
	// Need to pass olakeTimestamp as end argument to get the correct partition path based on record ingestion time.
	EvolveSchema(ctx context.Context, globalSchema, recordsSchema any) (any, error)
	// DropStreams is used to clear the destination before re-writing the stream
	DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error
	Close(ctx context.Context) error
}

type WriterOption added in v0.1.6

type WriterOption func(Writer) error

type WriterPool

type WriterPool struct {
	// contains filtered or unexported fields
}

func NewWriterPool added in v0.2.0

func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams []string, batchSize int64) (*WriterPool, error)

func (*WriterPool) AddRecordsToSyncStats added in v0.2.0

func (w *WriterPool) AddRecordsToSyncStats(count int64)

func (*WriterPool) GetStats added in v0.2.0

func (w *WriterPool) GetStats() *Stats

func (*WriterPool) NewWriter added in v0.2.0

func (w *WriterPool) NewWriter(ctx context.Context, stream types.StreamInterface, options ...ThreadOptions) (*WriterThread, error)

type WriterThread added in v0.2.0

type WriterThread struct {
	// contains filtered or unexported fields
}

writer thread used by reader

func (*WriterThread) Close added in v0.2.0

func (wt *WriterThread) Close(ctx context.Context) (err error)

func (*WriterThread) Push added in v0.2.0

func (wt *WriterThread) Push(ctx context.Context, record types.RawRecord) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL