types

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStateMissing       = errors.New("stream missing from state")
	ErrStateCursorMissing = errors.New("cursor field missing from state")
)
View Source
var TypeWeights = map[DataType]int{
	Bool:           0,
	Int32:          1,
	Int64:          2,
	Float64:        3,
	Float32:        4,
	String:         5,
	TimestampNano:  9,
	TimestampMicro: 8,
	TimestampMilli: 7,
	Timestamp:      6,
}

Functions

func LogCatalog

func LogCatalog(streams []*Stream, oldCatalog *Catalog)

func StreamsToMap

func StreamsToMap(streams ...*Stream) map[string]*Stream

Types

type ActionRow

type ActionRow struct {
}

type AdapterType

type AdapterType string
const (
	Parquet AdapterType = "PARQUET"
	Iceberg AdapterType = "ICEBERG"
)

type Catalog

type Catalog struct {
	SelectedStreams map[string][]StreamMetadata `json:"selected_streams,omitempty"`
	Streams         []*ConfiguredStream         `json:"streams,omitempty"`
}

ConfiguredCatalog is a dto for formatted airbyte catalog serialization

func GetWrappedCatalog

func GetWrappedCatalog(streams []*Stream) *Catalog

type Chunk

type Chunk struct {
	Min any `json:"min"`
	Max any `json:"max"`
}

Chunk struct that holds status, min, and max values

type ConfiguredStream

type ConfiguredStream struct {
	StreamMetadata          StreamMetadata `json:"-"`
	InitialCursorStateValue any            `json:"-"` // Cached initial state value

	Stream *Stream `json:"stream,omitempty"`

	// Column that's being used as cursor; MUST NOT BE mutated
	//
	// Cursor field is used in Incremental and in Mixed type CDC Read where connector uses
	// this field as recovery column incase of some inconsistencies
	CursorField    string   `json:"cursor_field,omitempty"`
	ExcludeColumns []string `json:"exclude_columns,omitempty"` // TODO: Implement excluding columns from fetching
}

Input/Processed object for Stream

func (*ConfiguredStream) Cursor

func (s *ConfiguredStream) Cursor() string

func (*ConfiguredStream) GetStream

func (s *ConfiguredStream) GetStream() *Stream

func (*ConfiguredStream) GetSyncMode

func (s *ConfiguredStream) GetSyncMode() SyncMode

func (*ConfiguredStream) ID

func (s *ConfiguredStream) ID() string

func (*ConfiguredStream) Name

func (s *ConfiguredStream) Name() string

func (*ConfiguredStream) Namespace

func (s *ConfiguredStream) Namespace() string

func (*ConfiguredStream) NormalizationEnabled

func (s *ConfiguredStream) NormalizationEnabled() bool

func (*ConfiguredStream) Schema

func (s *ConfiguredStream) Schema() *TypeSchema

func (*ConfiguredStream) Self

func (*ConfiguredStream) SupportedSyncModes

func (s *ConfiguredStream) SupportedSyncModes() *Set[SyncMode]

func (*ConfiguredStream) Validate

func (s *ConfiguredStream) Validate(source *Stream) error

Validate Configured Stream with Source Stream

type ConnectionStatus

type ConnectionStatus string
const (
	ConnectionSucceed ConnectionStatus = "SUCCEEDED"
	ConnectionFailed  ConnectionStatus = "FAILED"
)

type DataType

type DataType string
const (
	Null           DataType = "null"
	Int32          DataType = "integer_small"
	Int64          DataType = "integer"
	Float32        DataType = "number_small"
	Float64        DataType = "number"
	String         DataType = "string"
	Bool           DataType = "boolean"
	Object         DataType = "object"
	Array          DataType = "array"
	Unknown        DataType = "unknown"
	Timestamp      DataType = "timestamp"
	TimestampMilli DataType = "timestamp_milli" // storing datetime up to 3 precisions
	TimestampMicro DataType = "timestamp_micro" // storing datetime up to 6 precisions
	TimestampNano  DataType = "timestamp_nano"  // storing datetime up to 9 precisions
)

func (DataType) ToNewParquet

func (d DataType) ToNewParquet() parquet.Node

type GlobalState

type GlobalState struct {
	// Global State shared by streams
	State any `json:"state"`
	// Attaching Streams to Global State helps in recognizing the tables that the state belongs to.
	//
	// This results in helping connector determine what streams were synced during the last sync in
	// Group read. and also helps connectors to migrate from incremental to CDC Read without the need to
	// full load with the help of using cursor value and field as recovery cursor for CDC
	Streams *Set[string] `json:"streams"`
}

type Hashable

type Hashable interface {
	Hash() string
}

type Identifier

type Identifier interface {
	ID() string
}

type Iterable

type Iterable interface {
	Next() bool
	Err() error
}

type Log

type Log struct {
	Level   string `json:"level,omitempty"`
	Message string `json:"message,omitempty"`
}

Log is a dto for airbyte logs serialization

type Message

type Message struct {
	Type             MessageType            `json:"type"`
	Log              *Log                   `json:"log,omitempty"`
	ConnectionStatus *StatusRow             `json:"connectionStatus,omitempty"`
	State            *State                 `json:"state,omitempty"`
	Catalog          *Catalog               `json:"catalog,omitempty"`
	Action           *ActionRow             `json:"action,omitempty"`
	Spec             map[string]interface{} `json:"spec,omitempty"`
}

Message is a dto for olake output row representation

type MessageType

type MessageType string
const (
	LogMessage              MessageType = "LOG"
	ConnectionStatusMessage MessageType = "CONNECTION_STATUS"
	StateMessage            MessageType = "STATE"
	RecordMessage           MessageType = "RECORD"
	CatalogMessage          MessageType = "CATALOG"
	SpecMessage             MessageType = "SPEC"
	ActionMessage           MessageType = "ACTION"
)

type Property

type Property struct {
	Type *Set[DataType] `json:"type,omitempty"`
}

Property is a dto for catalog properties representation

func (*Property) DataType

func (p *Property) DataType() DataType

func (*Property) Nullable

func (p *Property) Nullable() bool

type RawRecord

type RawRecord struct {
	Data           map[string]any `parquet:"data,json"`
	OlakeID        string         `parquet:"_olake_id"`
	OlakeTimestamp time.Time      `parquet:"_olake_timestamp"`
	OperationType  string         `parquet:"_op_type"` // "r" for read/backfill, "c" for create, "u" for update, "d" for delete
	CdcTimestamp   time.Time      `parquet:"_cdc_timestamp"`
}

func CreateRawRecord

func CreateRawRecord(olakeID string, data map[string]any, operationType string, cdcTimestamp time.Time) RawRecord

func (*RawRecord) ToDebeziumFormat

func (r *RawRecord) ToDebeziumFormat(db string, stream string, normalization bool) (string, error)

type Record

type Record map[string]any

type Set

type Set[T comparable] struct {
	// contains filtered or unexported fields
}

func NewSet

func NewSet[T comparable](initial ...T) *Set[T]

Create a new set

func (*Set[T]) Array

func (st *Set[T]) Array() []T

func (*Set[T]) Difference

func (st *Set[T]) Difference(set *Set[T]) *Set[T]

Find the difference between two sets

func (*Set[T]) Exists

func (st *Set[T]) Exists(element T) bool

Test to see whether or not the element is in the set

func (*Set[T]) Hash

func (st *Set[T]) Hash(elem T) string

func (*Set[T]) Insert

func (st *Set[T]) Insert(elements ...T)

Add an element to the set

func (*Set[T]) Intersection

func (st *Set[T]) Intersection(set *Set[T]) *Set[T]

Find the intersection of two sets

func (*Set[T]) Len

func (st *Set[T]) Len() int

Return the number of items in the set

func (*Set[T]) MarshalJSON

func (st *Set[T]) MarshalJSON() ([]byte, error)

func (*Set[T]) ProperSubsetOf

func (st *Set[T]) ProperSubsetOf(set *Set[T]) bool

Test whether or not st set is a proper subset of "set"

func (*Set[T]) Range

func (st *Set[T]) Range(f func(T))

Call f for each item in the set

func (*Set[T]) Remove

func (st *Set[T]) Remove(element T)

Remove an element from the set

func (*Set[T]) String

func (st *Set[T]) String() string

func (*Set[T]) SubsetOf

func (st *Set[T]) SubsetOf(set *Set[T]) bool

Test whether or not st set is a subset of "set"

func (*Set[T]) Union

func (st *Set[T]) Union(set *Set[T]) *Set[T]

Find the union of two sets

func (*Set[T]) UnmarshalJSON

func (st *Set[T]) UnmarshalJSON(data []byte) error

func (*Set[T]) WithHasher

func (st *Set[T]) WithHasher(f func(T) string) *Set[T]

type State

type State struct {
	*sync.RWMutex `json:"-"`
	Type          StateType      `json:"type"`
	Global        *GlobalState   `json:"global,omitempty"`
	Streams       []*StreamState `json:"streams,omitempty"` // TODO: make it set
}

TODO: Add validation tags; Write custom unmarshal that triggers validation State is a dto for airbyte state serialization

func (*State) GetChunks

func (s *State) GetChunks(stream *ConfiguredStream) *Set[Chunk]

GetStateChunks retrieves all chunks from the state.

func (*State) GetCursor

func (s *State) GetCursor(stream *ConfiguredStream, key string) any

func (*State) GetGlobal

func (s *State) GetGlobal() *GlobalState

func (*State) HasCompletedBackfill

func (s *State) HasCompletedBackfill(stream *ConfiguredStream) bool

func (*State) LogState

func (s *State) LogState()

func (*State) LogWithLock

func (s *State) LogWithLock()

func (*State) MarshalJSON

func (s *State) MarshalJSON() ([]byte, error)

func (*State) RemoveChunk

func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int

remove chunk returns remaining chunk count after removing the chunk

func (*State) ResetStreams

func (s *State) ResetStreams()

func (*State) SetChunks

func (s *State) SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])

set chunks

func (*State) SetCursor

func (s *State) SetCursor(stream *ConfiguredStream, key string, value any)

func (*State) SetGlobal

func (s *State) SetGlobal(state any, streams ...string)

Set global state if state is not nil and streams are not empty

func (*State) SetType

func (s *State) SetType(typ StateType)

type StateInterface

type StateInterface interface {
	ResetStreams()
	SetType(typ StateType)
	GetCursor(stream *ConfiguredStream, key string) any
	SetCursor(stream *ConfiguredStream, key, value any)
	GetChunks(stream *ConfiguredStream) *Set[Chunk]
	SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])
	RemoveChunk(stream *ConfiguredStream, chunk Chunk)
	SetGlobal(globalState any, streams ...string)
}

type StateType

type StateType string
const (
	// Global Type indicates that the connector solely acts on Globally shared state across streams
	GlobalType StateType = "GLOBAL"
	// Streme Type indicates that the connector solely acts on individual stream state
	StreamType StateType = "STREAM"
	// Mixed type indicates that the connector works with a mix of Globally shared and
	// Individual stream state (Note: not being used yet but in plan)
	MixedType StateType = "MIXED"
	// constant key for chunks
	ChunksKey = "chunks"
)

type StatusRow

type StatusRow struct {
	Status  ConnectionStatus `json:"status,omitempty"`
	Message string           `json:"message,omitempty"`
}

StatusRow is a dto for airbyte result status serialization

type Stream

type Stream struct {
	// Name of the Stream
	Name string `json:"name,omitempty"`
	// Namespace of the Stream, or Database it belongs to
	// helps in identifying collections with same name in different database
	Namespace string `json:"namespace,omitempty"`
	// Possible Schema of the Stream
	Schema *TypeSchema `json:"type_schema,omitempty"`
	// Supported sync modes from driver for the respective Stream
	SupportedSyncModes *Set[SyncMode] `json:"supported_sync_modes,omitempty"`
	// Primary key if available
	SourceDefinedPrimaryKey *Set[string] `json:"source_defined_primary_key,omitempty"`
	// Available cursor fields supported by driver
	AvailableCursorFields *Set[string] `json:"available_cursor_fields,omitempty"`
	// Input of JSON Schema from Client to be parsed by driver
	AdditionalProperties string `json:"additional_properties,omitempty"`
	// Renderable JSON Schema for additional properties supported by respective driver for individual stream
	AdditionalPropertiesSchema schema.JSONSchema `json:"additional_properties_schema,omitempty"`
	SyncMode                   SyncMode          `json:"sync_mode,omitempty"` // Mode being used for syncing data
}

Output Stream Object for dsynk

func NewStream

func NewStream(name, namespace string) *Stream

func (*Stream) ID

func (s *Stream) ID() string

func (*Stream) UnmarshalJSON

func (s *Stream) UnmarshalJSON(data []byte) error

func (*Stream) UpsertField

func (s *Stream) UpsertField(column string, typ DataType, nullable bool)

Add or Update Column in Stream Type Schema

func (*Stream) WithCursorField

func (s *Stream) WithCursorField(columns ...string) *Stream

func (*Stream) WithPrimaryKey

func (s *Stream) WithPrimaryKey(keys ...string) *Stream

func (*Stream) WithSchema

func (s *Stream) WithSchema(schema *TypeSchema) *Stream

func (*Stream) WithSyncMode

func (s *Stream) WithSyncMode(modes ...SyncMode) *Stream

func (*Stream) Wrap

func (s *Stream) Wrap(_ int) *ConfiguredStream

type StreamInterface

type StreamInterface interface {
	ID() string
	Self() *ConfiguredStream
	Name() string
	Namespace() string
	Schema() *TypeSchema
	GetStream() *Stream
	GetSyncMode() SyncMode
	SupportedSyncModes() *Set[SyncMode]
	Cursor() string
	Validate(source *Stream) error
	NormalizationEnabled() bool
}

type StreamMetadata

type StreamMetadata struct {
	ChunkColumn    string `json:"chunk_column,omitempty"`
	PartitionRegex string `json:"partition_regex"`
	StreamName     string `json:"stream_name"`
	AppendMode     bool   `json:"append_mode,omitempty"`
	Normalization  bool   `json:"normalization" default:"false"`
}

type StreamState

type StreamState struct {
	HoldsValue atomic.Bool `json:"-"` // If State holds some value and should not be excluded during unmarshaling then value true
	Stream     string      `json:"stream"`
	Namespace  string      `json:"namespace"`
	SyncMode   string      `json:"sync_mode"`
	State      sync.Map    `json:"state"`
}

func (*StreamState) MarshalJSON

func (s *StreamState) MarshalJSON() ([]byte, error)

MarshalJSON custom marshaller to handle sync.Map encoding

func (*StreamState) UnmarshalJSON

func (s *StreamState) UnmarshalJSON(data []byte) error

UnmarshalJSON custom unmarshaller to handle sync.Map decoding

type SyncMode

type SyncMode string
const (
	FULLREFRESH SyncMode = "full_refresh"
	INCREMENTAL SyncMode = "incremental"
	CDC         SyncMode = "cdc"
	STRICTCDC   SyncMode = "strict_cdc"
)

type TypeSchema

type TypeSchema struct {
	Properties sync.Map `json:"-"`
	// contains filtered or unexported fields
}

func NewTypeSchema

func NewTypeSchema() *TypeSchema

func (*TypeSchema) AddTypes

func (t *TypeSchema) AddTypes(column string, types ...DataType)

func (*TypeSchema) GetProperty

func (t *TypeSchema) GetProperty(column string) (bool, *Property)

func (*TypeSchema) GetType

func (t *TypeSchema) GetType(column string) (DataType, error)

func (*TypeSchema) MarshalJSON

func (t *TypeSchema) MarshalJSON() ([]byte, error)

MarshalJSON custom marshaller to handle sync.Map encoding

func (*TypeSchema) Override

func (t *TypeSchema) Override(fields map[string]*Property)

func (*TypeSchema) ToParquet

func (t *TypeSchema) ToParquet() *parquet.Schema

func (*TypeSchema) UnmarshalJSON

func (t *TypeSchema) UnmarshalJSON(data []byte) error

UnmarshalJSON custom unmarshaller to handle sync.Map decoding

type WriterConfig

type WriterConfig struct {
	Type         AdapterType `json:"type"`
	WriterConfig any         `json:"writer"`
}

TODO: Add validations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL