models

package
v0.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DISCOVER_MODE bool
View Source
var FULL_REFRESH bool
View Source
var STREAM_NAME string

Functions

This section is empty.

Types

type AuthConfig

type AuthConfig struct {
	Required bool            `json:"required,omitempty"`
	Strategy string          `json:"strategy,omitempty"`
	Basic    BasicAuthConfig `json:"basic,omitempty"`
	Token    TokenAuthConfig `json:"token,omitempty"`
	OAuth    OAuthConfig     `json:"oauth,omitempty"`
}

type BasicAuthConfig

type BasicAuthConfig struct {
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

type Bookmark

type Bookmark struct {
	UpdatedAt string                   `json:"updated_at"`
	Latest    map[string]BookmarkEntry `json:"latest"`
}

func (Bookmark) Clone added in v0.8.5

func (b Bookmark) Clone() Bookmark

type BookmarkEntry added in v0.8.0

type BookmarkEntry struct {
	SurrogateKey string `json:"surrogate_key"`
	LastSeen     string `json:"last_seen"`
	LastEmitted  string `json:"last_emitted,omitempty"`
}

BookmarkEntry tracks the surrogate key and last seen timestamp for a record

type BookmarkUpdate added in v0.8.5

type BookmarkUpdate struct {
	NaturalKey   interface{}
	SurrogateKey string
	Timestamp    string
	Emitted      bool
}

type Message

type Message struct {
	Type               string                 `json:"type"`
	Record             map[string]interface{} `json:"record,omitempty"`
	Stream             string                 `json:"stream,omitempty"`
	Schema             interface{}            `json:"schema,omitempty"`
	Value              interface{}            `json:"value,omitempty"`
	KeyProperties      []string               `json:"key_properties,omitempty"`
	BookmarkProperties []string               `json:"bookmark_properties,omitempty"`
	Required           []string               `json:"required,omitempty"`
}

type Model added in v0.8.0

type Model interface {
	Create(source ...interface{}) error
	Read() error
	Update() error
	Message() error
}

Model represents a persistable entity with common lifecycle operations. All models that implement this interface provide consistent methods for creation, reading, updating, and message generation. The Create method accepts variadic interface{} parameters for flexibility, allowing each implementation to define what source data it needs.

type OAuthConfig

type OAuthConfig struct {
	ClientID     string `json:"client_id,omitempty"`
	ClientSecret string `json:"client_secret,omitempty"`
	RefreshToken string `json:"refresh_token,omitempty"`
	TokenURL     string `json:"token_url,omitempty"`
}

type PaginationQueryConfig

type PaginationQueryConfig struct {
	QueryParameter string `json:"query_parameter,omitempty"`
	QueryValue     int    `json:"query_value,omitempty"`
	QueryIncrement int    `json:"query_increment,omitempty"`
}

type Record added in v0.8.0

type Record map[string]interface{}

Record represents a data record with transformation capabilities. It provides a type-safe wrapper around map[string]interface{} with convenient accessor methods, transformation logic, and message generation.

func (*Record) Create added in v0.8.0

func (r *Record) Create(source ...interface{}) error

Create initialises the Record from a source. Accepts map[string]interface{} or any value that can be converted to it. Expects a single parameter containing the record data.

func (Record) Get added in v0.8.0

func (r Record) Get(key string) interface{}

Get retrieves a value from the record

func (Record) Message added in v0.8.0

func (r Record) Message() error

Message generates a RECORD type message and writes it to stdout

func (Record) PassesBookmark added in v0.8.0

func (r Record) PassesBookmark() bool

PassesBookmark checks if the record should be emitted based on the bookmark state. Returns true if the record is new or has been updated since the last extraction. Always returns true when FULL_REFRESH or DISCOVER_MODE is enabled.

func (*Record) Read added in v0.8.0

func (r *Record) Read() error

Read reads the record (record no-op)

func (Record) Set added in v0.8.0

func (r Record) Set(key string, value interface{})

Set sets a value in the record

func (Record) ToMap added in v0.8.0

func (r Record) ToMap() map[string]interface{}

ToMap converts the Record back to a plain map

func (Record) Update added in v0.8.0

func (r Record) Update() error

Update applies transformations to the record including dropping fields, hashing sensitive fields, and generating surrogate keys

type RecordsConfig

type RecordsConfig struct {
	UniqueKeyPath       []string   `json:"unique_key_path,omitempty"`
	DropFieldPaths      [][]string `json:"drop_field_paths,omitempty"`
	SensitiveFieldPaths [][]string `json:"sensitive_field_paths,omitempty"`
}

type ResponseConfig

type ResponseConfig struct {
	RecordsPath        []string              `json:"records_path,omitempty"`
	Pagination         bool                  `json:"pagination,omitempty"`
	PaginationStrategy string                `json:"pagination_strategy,omitempty"`
	PaginationNextPath []string              `json:"pagination_next_path,omitempty"`
	PaginationQuery    PaginationQueryConfig `json:"pagination_query,omitempty"`
}

type RestConfig

type RestConfig struct {
	Auth     AuthConfig     `json:"auth,omitempty"`
	Response ResponseConfig `json:"response,omitempty"`
}

type Schema added in v0.8.0

type Schema map[string]interface{}

Schema represents a JSON schema with generation and update capabilities. It provides methods for working with JSON schemas including property management, schema generation from records, and schema merging. While similar to Model entities, Schema is an in-memory data structure with parameterized methods for flexibility.

func (*Schema) Create added in v0.8.0

func (s *Schema) Create(data ...interface{}) error

Create initialises a new Schema, optionally from existing data

func (*Schema) CreateFromRecord added in v0.8.0

func (s *Schema) CreateFromRecord(record interface{}) error

CreateFromRecord generates a schema from a record

func (Schema) IsEmpty added in v0.8.0

func (s Schema) IsEmpty() bool

IsEmpty returns true if the schema has no properties

func (*Schema) Merge added in v0.8.0

func (s *Schema) Merge(newRecord map[string]interface{}) error

Merge merges this schema with another schema (from a new record)

func (*Schema) Message added in v0.8.0

func (s *Schema) Message() error

Message generates a SCHEMA type message and writes it to stdout

func (Schema) Properties added in v0.8.0

func (s Schema) Properties() map[string]interface{}

Properties returns the properties of the schema

func (*Schema) Read added in v0.8.0

func (s *Schema) Read() error

Read reads the schema (placeholder for Model interface)

func (Schema) SetProperties added in v0.8.0

func (s Schema) SetProperties(properties map[string]interface{})

SetProperties sets the properties of the schema

func (Schema) ToMap added in v0.8.0

func (s Schema) ToMap() map[string]interface{}

ToMap converts the Schema back to a plain map

func (Schema) Update added in v0.8.0

func (s Schema) Update() error

Update updates the schema (placeholder for Model interface)

type StreamCatalog

type StreamCatalog struct {
	KeyProperties      []string `json:"key_properties"`
	Schema             Schema   `json:"schema"`
	SchemaDiscoveredAt string   `json:"schema_discovered_at,omitempty"`
	Stream             string   `json:"stream"`
}

StreamCatalog represents a stream's schema catalog and implements the Model interface. It manages the JSON schema definition, key properties, and provides validation capabilities for records against the catalog schema.

var DerivedCatalog StreamCatalog

func (*StreamCatalog) Create

func (c *StreamCatalog) Create(source ...interface{}) error

Create creates a catalog JSON file for the stream

func (*StreamCatalog) Message

func (c *StreamCatalog) Message() error

Message generates a schema message from the derived catalog

func (*StreamCatalog) Read

func (c *StreamCatalog) Read() error

Read the Catalog JSON file

func (*StreamCatalog) Update

func (c *StreamCatalog) Update() error

Update the Catalog JSON file

func (*StreamCatalog) ValidateRecordAgainstCatalog added in v0.8.0

func (c *StreamCatalog) ValidateRecordAgainstCatalog(record map[string]interface{}) (bool, error)

ValidateRecordAgainstCatalog validates record against Catalog

type StreamConfig

type StreamConfig struct {
	StreamName     string        `json:"stream_name,omitempty"`
	SourceType     string        `json:"source_type,omitempty"`
	URL            string        `json:"url,omitempty"`
	MaxConcurrency int           `json:"max_concurrency,omitempty"`
	Records        RecordsConfig `json:"records,omitempty"`
	Rest           RestConfig    `json:"rest,omitempty"`
}

StreamConfig represents the configuration for a data stream. It defines the source type, connection details, authentication, and record processing rules.

var Config StreamConfig

func (*StreamConfig) Create added in v0.8.0

func (c *StreamConfig) Create(source ...interface{}) error

Create loads the StreamConfig from a JSON file Expects a single string parameter containing the file path

func (*StreamConfig) Message added in v0.8.0

func (c *StreamConfig) Message() error

Message generates a configuration message (no-op for config)

func (*StreamConfig) Read added in v0.8.0

func (c *StreamConfig) Read() error

Read reads the configuration (JSON file is loaded via Create, so this is a no-op)

func (*StreamConfig) Update added in v0.8.0

func (c *StreamConfig) Update() error

Update updates the configuration (no-op for config)

type StreamState

type StreamState struct {
	Stream                  string   `json:"stream"`
	LastExtractionStartedAt string   `json:"last_extraction_started_at,omitempty"`
	Bookmark                Bookmark `json:"bookmark"`
	PreviousBookmark        Bookmark `json:"-"`
}

StreamState represents a stream's extraction state and implements the Model interface. It maintains bookmarks for incremental extraction, tracking the latest processed records to enable resumable and incremental data extraction.

var State StreamState

func (*StreamState) Create

func (s *StreamState) Create(source ...interface{}) error

Create creates a state JSON file for the stream

func (*StreamState) Message

func (s *StreamState) Message() error

Message generates a message with the current state

func (*StreamState) QueueBookmarkUpdate added in v0.8.5

func (s *StreamState) QueueBookmarkUpdate(record map[string]interface{}, emitted bool)

QueueBookmarkUpdate enqueues a bookmark mutation for the current extraction run.

func (*StreamState) Read

func (s *StreamState) Read() error

Read reads the State JSON file

func (*StreamState) StartBookmarkUpdates added in v0.8.5

func (s *StreamState) StartBookmarkUpdates()

StartBookmarkUpdates starts the single-writer goroutine that owns bookmark mutations.

func (*StreamState) StartExtraction added in v0.8.0

func (s *StreamState) StartExtraction()

StartExtraction sets the extraction start timestamp for this run

func (*StreamState) StopBookmarkUpdates added in v0.8.5

func (s *StreamState) StopBookmarkUpdates()

StopBookmarkUpdates drains outstanding bookmark updates before final state persistence.

func (*StreamState) Update

func (s *StreamState) Update() error

Update writes the current state to the JSON file

type TokenAuthConfig

type TokenAuthConfig struct {
	Header      string `json:"header,omitempty"`
	HeaderValue string `json:"header_value,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL