Documentation
¶
Index ¶
- Variables
- func GetParquetRawSchema() *parquet.Schema
- func LogCatalog(streams []*Stream, oldCatalog *Catalog, driver string)
- func StreamsToMap(streams ...*Stream) map[string]*Stream
- type ActionRow
- type Catalog
- type Chunk
- type Condition
- type ConfiguredStream
- func (s *ConfiguredStream) Cursor() (string, string)
- func (s *ConfiguredStream) GetDestinationDatabase(icebergDB *string) string
- func (s *ConfiguredStream) GetDestinationTable() string
- func (s *ConfiguredStream) GetFilter() (Filter, error)
- func (s *ConfiguredStream) GetStream() *Stream
- func (s *ConfiguredStream) GetSyncMode() SyncMode
- func (s *ConfiguredStream) ID() string
- func (s *ConfiguredStream) Name() string
- func (s *ConfiguredStream) Namespace() string
- func (s *ConfiguredStream) NormalizationEnabled() bool
- func (s *ConfiguredStream) Schema() *TypeSchema
- func (s *ConfiguredStream) Self() *ConfiguredStream
- func (s *ConfiguredStream) SupportedSyncModes() *Set[SyncMode]
- func (s *ConfiguredStream) Validate(source *Stream) error
- type ConnectionStatus
- type DataType
- type DestinationType
- type Filter
- type GlobalState
- type Hashable
- type Identifier
- type Iterable
- type Log
- type Message
- type MessageType
- type PartitionKey
- type PartitionMetaData
- type Property
- type RawRecord
- type Record
- type Set
- func (st *Set[T]) Array() []T
- func (st *Set[T]) Difference(set *Set[T]) *Set[T]
- func (st *Set[T]) Exists(element T) bool
- func (st *Set[T]) Hash(elem T) string
- func (st *Set[T]) Insert(elements ...T)
- func (st *Set[T]) Intersection(set *Set[T]) *Set[T]
- func (st *Set[T]) Len() int
- func (st *Set[T]) MarshalJSON() ([]byte, error)
- func (st *Set[T]) ProperSubsetOf(set *Set[T]) bool
- func (st *Set[T]) Range(f func(T))
- func (st *Set[T]) Remove(element T)
- func (st *Set[T]) String() string
- func (st *Set[T]) SubsetOf(set *Set[T]) bool
- func (st *Set[T]) Union(set *Set[T]) *Set[T]
- func (st *Set[T]) UnmarshalJSON(data []byte) error
- func (st *Set[T]) WithHasher(f func(T) string) *Set[T]
- type State
- func (s *State) GetChunks(stream *ConfiguredStream) *Set[Chunk]
- func (s *State) GetCursor(stream *ConfiguredStream, key string) any
- func (s *State) GetGlobal() *GlobalState
- func (s *State) HasCompletedBackfill(stream *ConfiguredStream) bool
- func (s *State) LogState()
- func (s *State) LogWithLock()
- func (s *State) MarshalJSON() ([]byte, error)
- func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int
- func (s *State) ResetCursor(stream *ConfiguredStream)
- func (s *State) ResetStreams()
- func (s *State) SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])
- func (s *State) SetCursor(stream *ConfiguredStream, key string, value any)
- func (s *State) SetGlobal(state any, streams ...string)
- func (s *State) SetType(typ StateType)
- type StateInterface
- type StateType
- type StatusRow
- type Stream
- func (s *Stream) ID() string
- func (s *Stream) UnmarshalJSON(data []byte) error
- func (s *Stream) UpsertField(column string, typ DataType, nullable bool)
- func (s *Stream) WithCursorField(columns ...string) *Stream
- func (s *Stream) WithPrimaryKey(keys ...string) *Stream
- func (s *Stream) WithSchema(schema *TypeSchema) *Stream
- func (s *Stream) WithSyncMode(modes ...SyncMode) *Stream
- func (s *Stream) Wrap(_ int) *ConfiguredStream
- type StreamInterface
- type StreamMetadata
- type StreamState
- type SyncMode
- type TypeSchema
- func (t *TypeSchema) AddTypes(column string, types ...DataType)
- func (t *TypeSchema) GetProperty(column string) (bool, *Property)
- func (t *TypeSchema) GetType(column string) (DataType, error)
- func (t *TypeSchema) HasDestinationColumnName() bool
- func (t *TypeSchema) MarshalJSON() ([]byte, error)
- func (t *TypeSchema) Override(fields map[string]*Property)
- func (t *TypeSchema) ToIceberg() []*proto.IcebergPayload_SchemaField
- func (t *TypeSchema) ToParquet() *parquet.Schema
- func (t *TypeSchema) UnmarshalJSON(data []byte) error
- type WriterConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrStateMissing = errors.New("stream missing from state") ErrStateCursorMissing = errors.New("cursor field missing from state") )
var RawSchema = map[string]DataType{ constants.StringifiedData: String, constants.CdcTimestamp: Timestamp, constants.OlakeTimestamp: Timestamp, constants.OpType: String, constants.OlakeID: String, }
var TypeWeights = map[DataType]int{ Bool: 0, Int32: 1, Int64: 2, Float64: 3, Float32: 4, String: 5, TimestampNano: 9, TimestampMicro: 8, TimestampMilli: 7, Timestamp: 6, }
Functions ¶
func GetParquetRawSchema ¶ added in v0.2.7
func LogCatalog ¶
func StreamsToMap ¶
Types ¶
type Catalog ¶
type Catalog struct {
SelectedStreams map[string][]StreamMetadata `json:"selected_streams,omitempty"`
Streams []*ConfiguredStream `json:"streams,omitempty"`
}
ConfiguredCatalog is a dto for formatted airbyte catalog serialization
func GetStreamsDelta ¶ added in v0.2.9
GetStreamsDelta compares two catalogs and returns a new catalog with streams that have differences. Only selected streams are compared. 1. Compares properties from selected_streams: normalization, partition_regex, filter, append_mode 2. Compares properties from streams: destination_database, destination_table, cursor_field, sync_mode 3. For now, any new stream present in new catalog is added to the difference. Later collision detection will happen.
Parameters:
- oldStreams: The previous catalog to compare against
- newStreams: The current catalog with potential changes
Returns:
- A catalog containing only the streams that have differences
func GetWrappedCatalog ¶
type ConfiguredStream ¶
type ConfiguredStream struct {
StreamMetadata StreamMetadata `json:"-"`
Stream *Stream `json:"stream,omitempty"`
}
Input/Processed object for Stream
func (*ConfiguredStream) Cursor ¶
func (s *ConfiguredStream) Cursor() (string, string)
returns primary and secondary cursor
func (*ConfiguredStream) GetDestinationDatabase ¶ added in v0.2.0
func (s *ConfiguredStream) GetDestinationDatabase(icebergDB *string) string
func (*ConfiguredStream) GetDestinationTable ¶ added in v0.2.0
func (s *ConfiguredStream) GetDestinationTable() string
func (*ConfiguredStream) GetFilter ¶ added in v0.1.6
func (s *ConfiguredStream) GetFilter() (Filter, error)
func (*ConfiguredStream) GetStream ¶
func (s *ConfiguredStream) GetStream() *Stream
func (*ConfiguredStream) GetSyncMode ¶
func (s *ConfiguredStream) GetSyncMode() SyncMode
func (*ConfiguredStream) ID ¶
func (s *ConfiguredStream) ID() string
func (*ConfiguredStream) Name ¶
func (s *ConfiguredStream) Name() string
func (*ConfiguredStream) Namespace ¶
func (s *ConfiguredStream) Namespace() string
func (*ConfiguredStream) NormalizationEnabled ¶
func (s *ConfiguredStream) NormalizationEnabled() bool
func (*ConfiguredStream) Schema ¶
func (s *ConfiguredStream) Schema() *TypeSchema
func (*ConfiguredStream) Self ¶
func (s *ConfiguredStream) Self() *ConfiguredStream
func (*ConfiguredStream) SupportedSyncModes ¶
func (s *ConfiguredStream) SupportedSyncModes() *Set[SyncMode]
func (*ConfiguredStream) Validate ¶
func (s *ConfiguredStream) Validate(source *Stream) error
Validate Configured Stream with Source Stream
type ConnectionStatus ¶
type ConnectionStatus string
const ( ConnectionSucceed ConnectionStatus = "SUCCEEDED" ConnectionFailed ConnectionStatus = "FAILED" )
type DataType ¶
type DataType string
const ( Null DataType = "null" Int32 DataType = "integer_small" Int64 DataType = "integer" Float32 DataType = "number_small" Float64 DataType = "number" String DataType = "string" Bool DataType = "boolean" Object DataType = "object" Array DataType = "array" Unknown DataType = "unknown" Timestamp DataType = "timestamp" TimestampMilli DataType = "timestamp_milli" // storing datetime up to 3 precisions TimestampMicro DataType = "timestamp_micro" // storing datetime up to 6 precisions TimestampNano DataType = "timestamp_nano" // storing datetime up to 9 precisions )
func GetCommonAncestorType ¶ added in v0.1.7
GetCommonAncestorType returns lowest common ancestor type
func IcebergTypeToDatatype ¶ added in v0.2.0
func (DataType) ToNewParquet ¶
type DestinationType ¶ added in v0.2.0
type DestinationType string
const ( Parquet DestinationType = "PARQUET" Iceberg DestinationType = "ICEBERG" )
type Filter ¶ added in v0.1.6
type Filter struct {
Conditions []Condition // a > b, a < b
LogicalOperator string // condition[0] and/or condition[1], single and/or supported
}
Filter represents the parsed filter
type GlobalState ¶
type GlobalState struct {
// Global State shared by streams
State any `json:"state"`
// Attaching Streams to Global State helps in recognizing the tables that the state belongs to.
//
// This results in helping connector determine what streams were synced during the last sync in
// Group read. and also helps connectors to migrate from incremental to CDC Read without the need to
// full load with the help of using cursor value and field as recovery cursor for CDC
Streams *Set[string] `json:"streams"`
}
type Identifier ¶
type Identifier interface {
ID() string
}
type Message ¶
type Message struct {
Type MessageType `json:"type"`
Log *Log `json:"log,omitempty"`
ConnectionStatus *StatusRow `json:"connectionStatus,omitempty"`
State *State `json:"state,omitempty"`
Catalog *Catalog `json:"catalog,omitempty"`
Action *ActionRow `json:"action,omitempty"`
Spec map[string]interface{} `json:"spec,omitempty"`
}
Message is a dto for olake output row representation
type MessageType ¶
type MessageType string
const ( LogMessage MessageType = "LOG" ConnectionStatusMessage MessageType = "CONNECTION_STATUS" StateMessage MessageType = "STATE" RecordMessage MessageType = "RECORD" CatalogMessage MessageType = "CATALOG" SpecMessage MessageType = "SPEC" ActionMessage MessageType = "ACTION" )
type PartitionKey ¶ added in v0.3.0
PartitionKey represents a unique key for a Kafka partition and topic
type PartitionMetaData ¶ added in v0.3.0
type PartitionMetaData struct {
ReaderID string
Stream StreamInterface
PartitionID int
EndOffset int64
}
PartitionMetaData holds metadata about a Kafka partition for a specific stream reader
type Property ¶
type Property struct {
Type *Set[DataType] `json:"type,omitempty"`
DestinationColumnName string `json:"destination_column_name,omitempty"`
}
Property is a dto for catalog properties representation
type RawRecord ¶
type RawRecord struct {
Data map[string]any `parquet:"data,json"`
OlakeID string `parquet:"_olake_id"`
OlakeTimestamp time.Time `parquet:"_olake_timestamp"`
OperationType string `parquet:"_op_type"` // "r" for read/backfill, "c" for create, "u" for update, "d" for delete
CdcTimestamp *time.Time `parquet:"_cdc_timestamp"` // pointer because it will only be available for cdc sync
}
type Set ¶
type Set[T comparable] struct { // contains filtered or unexported fields }
func (*Set[T]) Difference ¶
Find the difference between two sets
func (*Set[T]) Intersection ¶
Find the intersection of two sets
func (*Set[T]) MarshalJSON ¶
func (*Set[T]) ProperSubsetOf ¶
Test whether or not st set is a proper subset of "set"
func (*Set[T]) UnmarshalJSON ¶
func (*Set[T]) WithHasher ¶
type State ¶
type State struct {
*sync.RWMutex `json:"-"`
Type StateType `json:"type"`
Global *GlobalState `json:"global,omitempty"`
Streams []*StreamState `json:"streams,omitempty"` // TODO: make it set
}
TODO: Add validation tags; Write custom unmarshal that triggers validation State is a dto for airbyte state serialization
func (*State) GetChunks ¶
func (s *State) GetChunks(stream *ConfiguredStream) *Set[Chunk]
GetStateChunks retrieves all chunks from the state.
func (*State) GetGlobal ¶
func (s *State) GetGlobal() *GlobalState
func (*State) HasCompletedBackfill ¶
func (s *State) HasCompletedBackfill(stream *ConfiguredStream) bool
func (*State) LogWithLock ¶
func (s *State) LogWithLock()
func (*State) MarshalJSON ¶
func (*State) RemoveChunk ¶
func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int
remove chunk returns remaining chunk count after removing the chunk
func (*State) ResetCursor ¶ added in v0.1.9
func (s *State) ResetCursor(stream *ConfiguredStream)
func (*State) ResetStreams ¶
func (s *State) ResetStreams()
func (*State) SetChunks ¶
func (s *State) SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])
set chunks
func (*State) SetCursor ¶
func (s *State) SetCursor(stream *ConfiguredStream, key string, value any)
type StateInterface ¶
type StateInterface interface {
ResetStreams()
SetType(typ StateType)
GetCursor(stream *ConfiguredStream, key string) any
SetCursor(stream *ConfiguredStream, key, value any)
GetChunks(stream *ConfiguredStream) *Set[Chunk]
SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])
RemoveChunk(stream *ConfiguredStream, chunk Chunk)
SetGlobal(globalState any, streams ...string)
}
type StateType ¶
type StateType string
const ( // Global Type indicates that the connector solely acts on Globally shared state across streams GlobalType StateType = "GLOBAL" // Streme Type indicates that the connector solely acts on individual stream state StreamType StateType = "STREAM" // Mixed type indicates that the connector works with a mix of Globally shared and // Individual stream state (Note: not being used yet but in plan) MixedType StateType = "MIXED" // constant key for chunks ChunksKey = "chunks" )
type StatusRow ¶
type StatusRow struct {
Status ConnectionStatus `json:"status,omitempty"`
Message string `json:"message,omitempty"`
}
StatusRow is a dto for airbyte result status serialization
type Stream ¶
type Stream struct {
// Name of the Stream
Name string `json:"name,omitempty"`
// Namespace of the Stream, or Database it belongs to
// helps in identifying collections with same name in different database
Namespace string `json:"namespace,omitempty"`
// Possible Schema of the Stream
Schema *TypeSchema `json:"type_schema,omitempty"`
// Supported sync modes from driver for the respective Stream
SupportedSyncModes *Set[SyncMode] `json:"supported_sync_modes,omitempty"`
// Primary key if available
SourceDefinedPrimaryKey *Set[string] `json:"source_defined_primary_key,omitempty"`
// Available cursor fields supported by driver
AvailableCursorFields *Set[string] `json:"available_cursor_fields,omitempty"`
// Input of JSON Schema from Client to be parsed by driver
AdditionalProperties string `json:"additional_properties,omitempty"`
// Cursor field to be used for incremental sync
CursorField string `json:"cursor_field,omitempty"`
// Mode being used for syncing data
SyncMode SyncMode `json:"sync_mode,omitempty"`
// Normalized Destination Database and Table used as default values for destination database and table
DestinationDatabase string `json:"destination_database,omitempty"`
DestinationTable string `json:"destination_table,omitempty"`
}
Output Stream Object for dsynk
func (*Stream) UnmarshalJSON ¶
func (*Stream) UpsertField ¶
Add or Update Column in Stream Type Schema
func (*Stream) WithCursorField ¶
func (*Stream) WithPrimaryKey ¶
func (*Stream) WithSchema ¶
func (s *Stream) WithSchema(schema *TypeSchema) *Stream
func (*Stream) WithSyncMode ¶
func (*Stream) Wrap ¶
func (s *Stream) Wrap(_ int) *ConfiguredStream
type StreamInterface ¶
type StreamInterface interface {
ID() string
Self() *ConfiguredStream
Name() string
Namespace() string
Schema() *TypeSchema
GetStream() *Stream
GetSyncMode() SyncMode
GetFilter() (Filter, error)
SupportedSyncModes() *Set[SyncMode]
Cursor() (string, string)
Validate(source *Stream) error
NormalizationEnabled() bool
GetDestinationDatabase(icebergDB *string) string
GetDestinationTable() string
}
type StreamMetadata ¶
type StreamState ¶
type StreamState struct {
HoldsValue atomic.Bool `json:"-"` // If State holds some value and should not be excluded during unmarshaling then value true
Stream string `json:"stream"`
Namespace string `json:"namespace"`
SyncMode string `json:"sync_mode"`
State sync.Map `json:"state"`
}
func (*StreamState) MarshalJSON ¶
func (s *StreamState) MarshalJSON() ([]byte, error)
MarshalJSON custom marshaller to handle sync.Map encoding
func (*StreamState) UnmarshalJSON ¶
func (s *StreamState) UnmarshalJSON(data []byte) error
UnmarshalJSON custom unmarshaller to handle sync.Map decoding
type TypeSchema ¶
func NewTypeSchema ¶
func NewTypeSchema() *TypeSchema
func (*TypeSchema) AddTypes ¶
func (t *TypeSchema) AddTypes(column string, types ...DataType)
func (*TypeSchema) GetProperty ¶
func (t *TypeSchema) GetProperty(column string) (bool, *Property)
func (*TypeSchema) HasDestinationColumnName ¶ added in v0.2.0
func (t *TypeSchema) HasDestinationColumnName() bool
func (*TypeSchema) MarshalJSON ¶
func (t *TypeSchema) MarshalJSON() ([]byte, error)
MarshalJSON custom marshaller to handle sync.Map encoding
func (*TypeSchema) Override ¶
func (t *TypeSchema) Override(fields map[string]*Property)
func (*TypeSchema) ToIceberg ¶ added in v0.2.0
func (t *TypeSchema) ToIceberg() []*proto.IcebergPayload_SchemaField
func (*TypeSchema) ToParquet ¶
func (t *TypeSchema) ToParquet() *parquet.Schema
func (*TypeSchema) UnmarshalJSON ¶
func (t *TypeSchema) UnmarshalJSON(data []byte) error
UnmarshalJSON custom unmarshaller to handle sync.Map decoding
type WriterConfig ¶
type WriterConfig struct {
Type DestinationType `json:"type"`
WriterConfig any `json:"writer"`
}
TODO: Add validations