Documentation
¶
Index ¶
- Constants
- Variables
- type CloseFunction
- type Config
- type FlattenFunction
- type InsertFunction
- type NewFunc
- type Options
- type ThreadEvent
- type ThreadOptions
- type Write
- type Writer
- type WriterOption
- type WriterPool
- func (w *WriterPool) AddRecordsToSync(recordCount int64)
- func (w *WriterPool) GetRecordsToSync() int64
- func (w *WriterPool) NewThread(parent context.Context, stream types.StreamInterface, errChan chan error, ...) *ThreadEvent
- func (w *WriterPool) SyncedRecords() int64
- func (w *WriterPool) Wait() error
Constants ¶
View Source
const DestError = "destination error"
Variables ¶
View Source
var RegisteredWriters = map[types.AdapterType]NewFunc{}
Functions ¶
This section is empty.
Types ¶
type CloseFunction ¶
type CloseFunction func()
type InsertFunction ¶
type ThreadEvent ¶
type ThreadEvent struct {
Close CloseFunction
Insert InsertFunction
}
type ThreadOptions ¶
type ThreadOptions func(opt *Options)
func WithBackfill ¶
func WithBackfill(backfill bool) ThreadOptions
func WithIdentifier ¶
func WithIdentifier(identifier string) ThreadOptions
func WithNumber ¶
func WithNumber(number int64) ThreadOptions
type Writer ¶
type Writer interface {
GetConfigRef() Config
Spec() any
Type() string
// Sets up connections and perform checks; doesn't load Streams
//
// Note: Check shouldn't be called before Setup as they're composed at Connector level
Check(ctx context.Context) error
// Setup sets up an Adapter for dedicated use for a stream
// avoiding the headover for different streams
Setup(stream types.StreamInterface, opts *Options) error
// Write function being used by drivers
Write(ctx context.Context, record types.RawRecord) error
// ReInitiationRequiredOnSchemaEvolution is implemented by Writers incase the writer needs to be re-initialized
// such as when writing parquet files, but in destinations like Kafka/Clickhouse/BigQuery they can handle
// schema update with an Alter Query
Flattener() FlattenFunction
// EvolveSchema updates the schema based on changes.
// Need to pass olakeTimestamp as end argument to get the correct partition path based on record ingestion time.
EvolveSchema(bool, bool, map[string]*types.Property, types.Record, time.Time) error
// DropStreams is used to clear the destination before re-writing the stream
DropStreams(ctx context.Context, selectedStream []string) error
Close(ctx context.Context) error
}
type WriterOption ¶ added in v0.1.6
type WriterPool ¶
type WriterPool struct {
ThreadCounter atomic.Int64 // Used in naming files in S3 and global count for threads
// contains filtered or unexported fields
}
func NewWriter ¶
func NewWriter(ctx context.Context, config *types.WriterConfig, dropStreams []string) (*WriterPool, error)
NewWriter creates a new WriterPool with optional configuration
func (*WriterPool) AddRecordsToSync ¶
func (w *WriterPool) AddRecordsToSync(recordCount int64)
func (*WriterPool) GetRecordsToSync ¶
func (w *WriterPool) GetRecordsToSync() int64
func (*WriterPool) NewThread ¶
func (w *WriterPool) NewThread(parent context.Context, stream types.StreamInterface, errChan chan error, options ...ThreadOptions) *ThreadEvent
Initialize new adapter thread for writing into destination
func (*WriterPool) SyncedRecords ¶
func (w *WriterPool) SyncedRecords() int64
Returns total records fetched at runtime
func (*WriterPool) Wait ¶
func (w *WriterPool) Wait() error
Click to show internal directories.
Click to hide internal directories.