Documentation
¶
Index ¶
- Variables
- type AuthConfig
- type BasicAuthConfig
- type Bookmark
- type BookmarkEntry
- type BookmarkUpdate
- type Message
- type Model
- type OAuthConfig
- type PaginationQueryConfig
- type Record
- func (r *Record) Create(source ...interface{}) error
- func (r Record) Get(key string) interface{}
- func (r Record) Message() error
- func (r Record) PassesBookmark() bool
- func (r *Record) Read() error
- func (r Record) Set(key string, value interface{})
- func (r Record) ToMap() map[string]interface{}
- func (r Record) Update() error
- type RecordsConfig
- type ResponseConfig
- type RestConfig
- type Schema
- func (s *Schema) Create(data ...interface{}) error
- func (s *Schema) CreateFromRecord(record interface{}) error
- func (s Schema) IsEmpty() bool
- func (s *Schema) Merge(newRecord map[string]interface{}) error
- func (s *Schema) Message() error
- func (s Schema) Properties() map[string]interface{}
- func (s *Schema) Read() error
- func (s Schema) SetProperties(properties map[string]interface{})
- func (s Schema) ToMap() map[string]interface{}
- func (s Schema) Update() error
- type StreamCatalog
- type StreamConfig
- type StreamState
- func (s *StreamState) Create(source ...interface{}) error
- func (s *StreamState) Message() error
- func (s *StreamState) QueueBookmarkUpdate(record map[string]interface{}, emitted bool)
- func (s *StreamState) Read() error
- func (s *StreamState) StartBookmarkUpdates()
- func (s *StreamState) StartExtraction()
- func (s *StreamState) StopBookmarkUpdates()
- func (s *StreamState) Update() error
- type TokenAuthConfig
Constants ¶
This section is empty.
Variables ¶
var DISCOVER_MODE bool
var FULL_REFRESH bool
var STREAM_NAME string
Functions ¶
This section is empty.
Types ¶
type AuthConfig ¶
type AuthConfig struct {
Required bool `json:"required,omitempty"`
Strategy string `json:"strategy,omitempty"`
Basic BasicAuthConfig `json:"basic,omitempty"`
Token TokenAuthConfig `json:"token,omitempty"`
OAuth OAuthConfig `json:"oauth,omitempty"`
}
type BasicAuthConfig ¶
type Bookmark ¶
type Bookmark struct {
UpdatedAt string `json:"updated_at"`
Latest map[string]BookmarkEntry `json:"latest"`
}
type BookmarkEntry ¶ added in v0.8.0
type BookmarkEntry struct {
SurrogateKey string `json:"surrogate_key"`
LastSeen string `json:"last_seen"`
LastEmitted string `json:"last_emitted,omitempty"`
}
BookmarkEntry tracks the surrogate key and last seen timestamp for a record
type BookmarkUpdate ¶ added in v0.8.5
type Message ¶
type Message struct {
Type string `json:"type"`
Record map[string]interface{} `json:"record,omitempty"`
Stream string `json:"stream,omitempty"`
Schema interface{} `json:"schema,omitempty"`
Value interface{} `json:"value,omitempty"`
KeyProperties []string `json:"key_properties,omitempty"`
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
Required []string `json:"required,omitempty"`
}
type Model ¶ added in v0.8.0
type Model interface {
Create(source ...interface{}) error
Read() error
Update() error
Message() error
}
Model represents a persistable entity with common lifecycle operations. All models that implement this interface provide consistent methods for creation, reading, updating, and message generation. The Create method accepts variadic interface{} parameters for flexibility, allowing each implementation to define what source data it needs.
type OAuthConfig ¶
type PaginationQueryConfig ¶
type Record ¶ added in v0.8.0
type Record map[string]interface{}
Record represents a data record with transformation capabilities. It provides a type-safe wrapper around map[string]interface{} with convenient accessor methods, transformation logic, and message generation.
func (*Record) Create ¶ added in v0.8.0
Create initialises the Record from a source. Accepts map[string]interface{} or any value that can be converted to it. Expects a single parameter containing the record data.
func (Record) Message ¶ added in v0.8.0
Message generates a RECORD type message and writes it to stdout
func (Record) PassesBookmark ¶ added in v0.8.0
PassesBookmark checks if the record should be emitted based on the bookmark state. Returns true if the record is new or has been updated since the last extraction. Always returns true when FULL_REFRESH or DISCOVER_MODE is enabled.
type RecordsConfig ¶
type ResponseConfig ¶
type ResponseConfig struct {
RecordsPath []string `json:"records_path,omitempty"`
Pagination bool `json:"pagination,omitempty"`
PaginationStrategy string `json:"pagination_strategy,omitempty"`
PaginationNextPath []string `json:"pagination_next_path,omitempty"`
PaginationQuery PaginationQueryConfig `json:"pagination_query,omitempty"`
}
type RestConfig ¶
type RestConfig struct {
Auth AuthConfig `json:"auth,omitempty"`
Response ResponseConfig `json:"response,omitempty"`
}
type Schema ¶ added in v0.8.0
type Schema map[string]interface{}
Schema represents a JSON schema with generation and update capabilities. It provides methods for working with JSON schemas including property management, schema generation from records, and schema merging. While similar to Model entities, Schema is an in-memory data structure with parameterized methods for flexibility.
func (*Schema) Create ¶ added in v0.8.0
Create initialises a new Schema, optionally from existing data
func (*Schema) CreateFromRecord ¶ added in v0.8.0
CreateFromRecord generates a schema from a record
func (*Schema) Merge ¶ added in v0.8.0
Merge merges this schema with another schema (from a new record)
func (*Schema) Message ¶ added in v0.8.0
Message generates a SCHEMA type message and writes it to stdout
func (Schema) Properties ¶ added in v0.8.0
Properties returns the properties of the schema
func (Schema) SetProperties ¶ added in v0.8.0
SetProperties sets the properties of the schema
type StreamCatalog ¶
type StreamCatalog struct {
KeyProperties []string `json:"key_properties"`
Schema Schema `json:"schema"`
SchemaDiscoveredAt string `json:"schema_discovered_at,omitempty"`
Stream string `json:"stream"`
}
StreamCatalog represents a stream's schema catalog and implements the Model interface. It manages the JSON schema definition, key properties, and provides validation capabilities for records against the catalog schema.
var DerivedCatalog StreamCatalog
func (*StreamCatalog) Create ¶
func (c *StreamCatalog) Create(source ...interface{}) error
Create creates a catalog JSON file for the stream
func (*StreamCatalog) Message ¶
func (c *StreamCatalog) Message() error
Message generates a schema message from the derived catalog
func (*StreamCatalog) ValidateRecordAgainstCatalog ¶ added in v0.8.0
func (c *StreamCatalog) ValidateRecordAgainstCatalog(record map[string]interface{}) (bool, error)
ValidateRecordAgainstCatalog validates record against Catalog
type StreamConfig ¶
type StreamConfig struct {
StreamName string `json:"stream_name,omitempty"`
SourceType string `json:"source_type,omitempty"`
URL string `json:"url,omitempty"`
MaxConcurrency int `json:"max_concurrency,omitempty"`
Records RecordsConfig `json:"records,omitempty"`
Rest RestConfig `json:"rest,omitempty"`
}
StreamConfig represents the configuration for a data stream. It defines the source type, connection details, authentication, and record processing rules.
var Config StreamConfig
func (*StreamConfig) Create ¶ added in v0.8.0
func (c *StreamConfig) Create(source ...interface{}) error
Create loads the StreamConfig from a JSON file Expects a single string parameter containing the file path
func (*StreamConfig) Message ¶ added in v0.8.0
func (c *StreamConfig) Message() error
Message generates a configuration message (no-op for config)
func (*StreamConfig) Read ¶ added in v0.8.0
func (c *StreamConfig) Read() error
Read reads the configuration (JSON file is loaded via Create, so this is a no-op)
func (*StreamConfig) Update ¶ added in v0.8.0
func (c *StreamConfig) Update() error
Update updates the configuration (no-op for config)
type StreamState ¶
type StreamState struct {
Stream string `json:"stream"`
LastExtractionStartedAt string `json:"last_extraction_started_at,omitempty"`
Bookmark Bookmark `json:"bookmark"`
PreviousBookmark Bookmark `json:"-"`
}
StreamState represents a stream's extraction state and implements the Model interface. It maintains bookmarks for incremental extraction, tracking the latest processed records to enable resumable and incremental data extraction.
var State StreamState
func (*StreamState) Create ¶
func (s *StreamState) Create(source ...interface{}) error
Create creates a state JSON file for the stream
func (*StreamState) Message ¶
func (s *StreamState) Message() error
Message generates a message with the current state
func (*StreamState) QueueBookmarkUpdate ¶ added in v0.8.5
func (s *StreamState) QueueBookmarkUpdate(record map[string]interface{}, emitted bool)
QueueBookmarkUpdate enqueues a bookmark mutation for the current extraction run.
func (*StreamState) StartBookmarkUpdates ¶ added in v0.8.5
func (s *StreamState) StartBookmarkUpdates()
StartBookmarkUpdates starts the single-writer goroutine that owns bookmark mutations.
func (*StreamState) StartExtraction ¶ added in v0.8.0
func (s *StreamState) StartExtraction()
StartExtraction sets the extraction start timestamp for this run
func (*StreamState) StopBookmarkUpdates ¶ added in v0.8.5
func (s *StreamState) StopBookmarkUpdates()
StopBookmarkUpdates drains outstanding bookmark updates before final state persistence.
func (*StreamState) Update ¶
func (s *StreamState) Update() error
Update writes the current state to the JSON file