destination

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const DestError = "destination error"

Variables

View Source
var RegisteredWriters = map[types.AdapterType]NewFunc{}

Functions

This section is empty.

Types

type CloseFunction

type CloseFunction func()

type Config

type Config interface {
	Validate() error
}

type FlattenFunction

type FlattenFunction = func(record types.Record) (types.Record, error)

type InsertFunction

type InsertFunction func(record types.RawRecord) (err error)

type NewFunc

type NewFunc func() Writer

type Options

type Options struct {
	Identifier string
	Number     int64
	Backfill   bool
}

type ThreadEvent

type ThreadEvent struct {
	Close  CloseFunction
	Insert InsertFunction
}

type ThreadOptions

type ThreadOptions func(opt *Options)

func WithBackfill

func WithBackfill(backfill bool) ThreadOptions

func WithIdentifier

func WithIdentifier(identifier string) ThreadOptions

func WithNumber

func WithNumber(number int64) ThreadOptions

type Write

type Write = func(ctx context.Context, channel <-chan types.Record) error

type Writer

type Writer interface {
	GetConfigRef() Config
	Spec() any
	Type() string
	// Sets up connections and perform checks; doesn't load Streams
	//
	// Note: Check shouldn't be called before Setup as they're composed at Connector level
	Check(ctx context.Context) error
	// Setup sets up an Adapter for dedicated use for a stream
	// avoiding the headover for different streams
	Setup(stream types.StreamInterface, opts *Options) error
	// Write function being used by drivers
	Write(ctx context.Context, record types.RawRecord) error

	// ReInitiationRequiredOnSchemaEvolution is implemented by Writers incase the writer needs to be re-initialized
	// such as when writing parquet files, but in destinations like Kafka/Clickhouse/BigQuery they can handle
	// schema update with an Alter Query
	Flattener() FlattenFunction
	// EvolveSchema updates the schema based on changes.
	// Need to pass olakeTimestamp as end argument to get the correct partition path based on record ingestion time.
	EvolveSchema(bool, bool, map[string]*types.Property, types.Record, time.Time) error
	Close(ctx context.Context) error
}

type WriterPool

type WriterPool struct {
	ThreadCounter atomic.Int64 // Used in naming files in S3 and global count for threads
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(ctx context.Context, config *types.WriterConfig) (*WriterPool, error)

Shouldn't the name be NewWriterPool?

func (*WriterPool) AddRecordsToSync

func (w *WriterPool) AddRecordsToSync(recordCount int64)

func (*WriterPool) GetRecordsToSync

func (w *WriterPool) GetRecordsToSync() int64

func (*WriterPool) NewThread

func (w *WriterPool) NewThread(parent context.Context, stream types.StreamInterface, errChan chan error, options ...ThreadOptions) *ThreadEvent

Initialize new adapter thread for writing into destination

func (*WriterPool) SyncedRecords

func (w *WriterPool) SyncedRecords() int64

Returns total records fetched at runtime

func (*WriterPool) Wait

func (w *WriterPool) Wait() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL