models

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DISCOVER_MODE bool
View Source
var FULL_REFRESH bool
View Source
var STREAM_NAME string
View Source
var StateMutex sync.RWMutex

Functions

This section is empty.

Types

type AuthConfig

type AuthConfig struct {
	Required bool            `json:"required,omitempty"`
	Strategy string          `json:"strategy,omitempty"`
	Basic    BasicAuthConfig `json:"basic,omitempty"`
	Token    TokenAuthConfig `json:"token,omitempty"`
	OAuth    OAuthConfig     `json:"oauth,omitempty"`
}

type BasicAuthConfig

type BasicAuthConfig struct {
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

type Bookmark

type Bookmark struct {
	UpdatedAt string                   `json:"updated_at"`
	Latest    map[string]BookmarkEntry `json:"latest"`
}

type BookmarkEntry added in v0.8.0

type BookmarkEntry struct {
	SurrogateKey string `json:"surrogate_key"`
	LastSeen     string `json:"last_seen"`
}

BookmarkEntry tracks the surrogate key and last seen timestamp for a record

type Message

type Message struct {
	Type               string                 `json:"type"`
	Record             map[string]interface{} `json:"record,omitempty"`
	Stream             string                 `json:"stream,omitempty"`
	Schema             interface{}            `json:"schema,omitempty"`
	Value              interface{}            `json:"value,omitempty"`
	KeyProperties      []string               `json:"key_properties,omitempty"`
	BookmarkProperties []string               `json:"bookmark_properties,omitempty"`
	Required           []string               `json:"required,omitempty"`
}

type Model added in v0.8.0

type Model interface {
	Create(source ...interface{}) error
	Read() error
	Update() error
	Message() error
}

Model represents a persistable entity with common lifecycle operations. All models that implement this interface provide consistent methods for creation, reading, updating, and message generation. The Create method accepts variadic interface{} parameters for flexibility, allowing each implementation to define what source data it needs.

type OAuthConfig

type OAuthConfig struct {
	ClientID     string `json:"client_id,omitempty"`
	ClientSecret string `json:"client_secret,omitempty"`
	RefreshToken string `json:"refresh_token,omitempty"`
	TokenURL     string `json:"token_url,omitempty"`
}

type PaginationQueryConfig

type PaginationQueryConfig struct {
	QueryParameter string `json:"query_parameter,omitempty"`
	QueryValue     int    `json:"query_value,omitempty"`
	QueryIncrement int    `json:"query_increment,omitempty"`
}

type Record added in v0.8.0

type Record map[string]interface{}

Record represents a data record with transformation capabilities. It provides a type-safe wrapper around map[string]interface{} with convenient accessor methods, transformation logic, and message generation.

func (*Record) Create added in v0.8.0

func (r *Record) Create(source ...interface{}) error

Create initialises the Record from a source. Accepts map[string]interface{} or any value that can be converted to it. Expects a single parameter containing the record data.

func (Record) Get added in v0.8.0

func (r Record) Get(key string) interface{}

Get retrieves a value from the record

func (Record) Message added in v0.8.0

func (r Record) Message() error

Message generates a RECORD type message and writes it to stdout

func (Record) PassesBookmark added in v0.8.0

func (r Record) PassesBookmark() bool

PassesBookmark checks if the record should be emitted based on the bookmark state. Returns true if the record is new or has been updated since the last extraction. Always returns true when FULL_REFRESH or DISCOVER_MODE is enabled.

func (*Record) Read added in v0.8.0

func (r *Record) Read() error

Read reads the record (record no-op)

func (Record) Set added in v0.8.0

func (r Record) Set(key string, value interface{})

Set sets a value in the record

func (Record) ToMap added in v0.8.0

func (r Record) ToMap() map[string]interface{}

ToMap converts the Record back to a plain map

func (Record) Update added in v0.8.0

func (r Record) Update() error

Update applies transformations to the record including dropping fields, hashing sensitive fields, and generating surrogate keys

type RecordsConfig

type RecordsConfig struct {
	UniqueKeyPath       []string   `json:"unique_key_path,omitempty"`
	DropFieldPaths      [][]string `json:"drop_field_paths,omitempty"`
	SensitiveFieldPaths [][]string `json:"sensitive_field_paths,omitempty"`
}

type ResponseConfig

type ResponseConfig struct {
	RecordsPath        []string              `json:"records_path,omitempty"`
	Pagination         bool                  `json:"pagination,omitempty"`
	PaginationStrategy string                `json:"pagination_strategy,omitempty"`
	PaginationNextPath []string              `json:"pagination_next_path,omitempty"`
	PaginationQuery    PaginationQueryConfig `json:"pagination_query,omitempty"`
}

type RestConfig

type RestConfig struct {
	Auth     AuthConfig     `json:"auth,omitempty"`
	Response ResponseConfig `json:"response,omitempty"`
}

type Schema added in v0.8.0

type Schema map[string]interface{}

Schema represents a JSON schema with generation and update capabilities. It provides methods for working with JSON schemas including property management, schema generation from records, and schema merging. While similar to Model entities, Schema is an in-memory data structure with parameterized methods for flexibility.

func (*Schema) Create added in v0.8.0

func (s *Schema) Create(data ...interface{}) error

Create initialises a new Schema, optionally from existing data

func (*Schema) CreateFromRecord added in v0.8.0

func (s *Schema) CreateFromRecord(record interface{}) error

CreateFromRecord generates a schema from a record

func (Schema) IsEmpty added in v0.8.0

func (s Schema) IsEmpty() bool

IsEmpty returns true if the schema has no properties

func (*Schema) Merge added in v0.8.0

func (s *Schema) Merge(newRecord map[string]interface{}) error

Merge merges this schema with another schema (from a new record)

func (*Schema) Message added in v0.8.0

func (s *Schema) Message() error

Message generates a SCHEMA type message and writes it to stdout

func (Schema) Properties added in v0.8.0

func (s Schema) Properties() map[string]interface{}

Properties returns the properties of the schema

func (*Schema) Read added in v0.8.0

func (s *Schema) Read() error

Read reads the schema (placeholder for Model interface)

func (Schema) SetProperties added in v0.8.0

func (s Schema) SetProperties(properties map[string]interface{})

SetProperties sets the properties of the schema

func (Schema) ToMap added in v0.8.0

func (s Schema) ToMap() map[string]interface{}

ToMap converts the Schema back to a plain map

func (Schema) Update added in v0.8.0

func (s Schema) Update() error

Update updates the schema (placeholder for Model interface)

type StreamCatalog

type StreamCatalog struct {
	KeyProperties []string               `json:"key_properties"`
	Schema        map[string]interface{} `json:"schema"`
	Stream        string                 `json:"stream"`
}

StreamCatalog represents a stream's schema catalog and implements the Model interface. It manages the JSON schema definition, key properties, and provides validation capabilities for records against the catalog schema.

var DerivedCatalog StreamCatalog

func (*StreamCatalog) Create

func (c *StreamCatalog) Create(source ...interface{}) error

Create creates a catalog JSON file for the stream

func (*StreamCatalog) Message

func (c *StreamCatalog) Message() error

Message generates a schema message from the derived catalog

func (*StreamCatalog) Read

func (c *StreamCatalog) Read() error

Read the Catalog JSON file

func (*StreamCatalog) Update

func (c *StreamCatalog) Update() error

Update the Catalog JSON file

func (*StreamCatalog) ValidateRecordAgainstCatalog added in v0.8.0

func (c *StreamCatalog) ValidateRecordAgainstCatalog(record map[string]interface{}) (bool, error)

ValidateRecordAgainstCatalog validates record against Catalog

type StreamConfig

type StreamConfig struct {
	StreamName     string        `json:"stream_name,omitempty"`
	SourceType     string        `json:"source_type,omitempty"`
	URL            string        `json:"url,omitempty"`
	MaxConcurrency int           `json:"max_concurrency,omitempty"`
	Records        RecordsConfig `json:"records,omitempty"`
	Rest           RestConfig    `json:"rest,omitempty"`
}

StreamConfig represents the configuration for a data stream. It defines the source type, connection details, authentication, and record processing rules.

var Config StreamConfig

func (*StreamConfig) Create added in v0.8.0

func (c *StreamConfig) Create(source ...interface{}) error

Create loads the StreamConfig from a JSON file Expects a single string parameter containing the file path

func (*StreamConfig) Message added in v0.8.0

func (c *StreamConfig) Message() error

Message generates a configuration message (no-op for config)

func (*StreamConfig) Read added in v0.8.0

func (c *StreamConfig) Read() error

Read reads the configuration (JSON file is loaded via Create, so this is a no-op)

func (*StreamConfig) Update added in v0.8.0

func (c *StreamConfig) Update() error

Update updates the configuration (no-op for config)

type StreamState

type StreamState struct {
	Stream                  string   `json:"stream"`
	LastExtractionStartedAt string   `json:"last_extraction_started_at,omitempty"`
	Bookmark                Bookmark `json:"bookmark"`
}

StreamState represents a stream's extraction state and implements the Model interface. It maintains bookmarks for incremental extraction, tracking the latest processed records to enable resumable and incremental data extraction.

var State StreamState

func (*StreamState) Create

func (s *StreamState) Create(source ...interface{}) error

Create creates a state JSON file for the stream

func (*StreamState) Message

func (s *StreamState) Message() error

Message generates a message with the current state

func (*StreamState) Read

func (s *StreamState) Read() error

Read reads the State JSON file

func (*StreamState) StartExtraction added in v0.8.0

func (s *StreamState) StartExtraction()

StartExtraction sets the extraction start timestamp for this run

func (*StreamState) Update

func (s *StreamState) Update() error

Update writes the current state to the JSON file

func (*StreamState) UpdateBookmark added in v0.8.0

func (s *StreamState) UpdateBookmark(record map[string]interface{})

UpdateBookmark updates the bookmark with information from a record

type TokenAuthConfig

type TokenAuthConfig struct {
	Header      string `json:"header,omitempty"`
	HeaderValue string `json:"header_value,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL