Documentation
¶
Overview ¶
Package models provides template rendering functionality for model SQL transformations
Index ¶
- Constants
- Variables
- func FormatModelID(database, table string) string
- func ParseModelID(modelID string) (database, table string, err error)
- type CacheOverride
- type Config
- type DAGReader
- type DependencyGraph
- func (d *DependencyGraph) AddExternalModels(models []External) error
- func (d *DependencyGraph) AddTransformationEdges(models []Transformation) error
- func (d *DependencyGraph) AddTransformationModels(models []Transformation) error
- func (d *DependencyGraph) BuildGraph(transformationModels []Transformation, externalModels []External) error
- func (d *DependencyGraph) GetAllDependencies(modelID string) []string
- func (d *DependencyGraph) GetAllDependents(modelID string) []string
- func (d *DependencyGraph) GetDependencies(modelID string) []string
- func (d *DependencyGraph) GetDependents(modelID string) []string
- func (d *DependencyGraph) GetExternalNode(modelID string) (External, error)
- func (d *DependencyGraph) GetExternalNodes() []Node
- func (d *DependencyGraph) GetNode(modelID string) (Node, error)
- func (d *DependencyGraph) GetStructuredDependencies(modelID string) []transformation.Dependency
- func (d *DependencyGraph) GetTransformationNode(modelID string) (Transformation, error)
- func (d *DependencyGraph) GetTransformationNodes() []Transformation
- func (d *DependencyGraph) IsPathBetween(fromModelID, toModelID string) bool
- type External
- type ExternalConfig
- type ExternalOverride
- type ExternalType
- type IntervalOverride
- type LimitsOverride
- type ModelFile
- type ModelOverride
- func (m *ModelOverride) ApplyToExternal(config *external.Config)
- func (m *ModelOverride) ApplyToTransformation(model Transformation)
- func (m *ModelOverride) IsDisabled() bool
- func (m *ModelOverride) ResolveConfig(actualType ModelType) error
- func (m *ModelOverride) UnmarshalYAML(node *yaml.Node) error
- type ModelType
- type Node
- type NodeType
- type OverrideConfig
- type SchedulesOverride
- type Service
- type TemplateEngine
- func (t *TemplateEngine) GetTransformationEnvironmentVariables(model Transformation, position, interval uint64, startTime time.Time) (*[]string, error)
- func (t *TemplateEngine) RenderExternal(model External, cacheState map[string]interface{}) (string, error)
- func (t *TemplateEngine) RenderTransformation(model Transformation, position, interval uint64, startTime time.Time) (string, error)
- type Transformation
- type TransformationConfig
- type TransformationOverride
- type TransformationType
Constants ¶
const ( // TransformationTypeSQL represents SQL transformation models TransformationTypeSQL TransformationType = transformation.TransformationTypeSQL // TransformationTypeExec represents exec transformation models TransformationTypeExec TransformationType = transformation.TransformationTypeExec // ExtSQL is the SQL file extension ExtSQL = ".sql" // ExtYAML is the YAML file extension ExtYAML = ".yaml" // ExtYML is the alternative YAML file extension ExtYML = ".yml" )
Variables ¶
var ( // ErrNonExistentDependency is returned when a model depends on a non-existent model ErrNonExistentDependency = errors.New("model depends on non-existent model") // ErrNotExternalModel is returned when a model is not an external model ErrNotExternalModel = errors.New("model is not an external model") // ErrNotTransformationModel is returned when a model is not a transformation model ErrNotTransformationModel = errors.New("model is not a transformation model") // ErrInvalidNodeType is returned when a node has an invalid type ErrInvalidNodeType = errors.New("invalid node type") // ErrInvalidExternalModelType is returned when an external model has an invalid type ErrInvalidExternalModelType = errors.New("invalid external model type") // ErrInvalidTransformationModelType is returned when a transformation model has an invalid type ErrInvalidTransformationModelType = errors.New("invalid transformation model type") // ErrIncompatibleIntervalType is returned when incremental model cannot depend on model with different interval_type ErrIncompatibleIntervalType = errors.New("incremental model cannot depend on model with different interval_type") )
var ( // ErrInvalidExternalType is returned when an invalid external type is specified ErrInvalidExternalType = errors.New("invalid external type") )
var ErrInvalidModelID = modelid.ErrInvalidModelID
ErrInvalidModelID is returned when a model ID is not in the expected format.
var ( // ErrInvalidTransformationType is returned when an invalid transformation type is specified ErrInvalidTransformationType = errors.New("invalid transformation type") )
var ( // ErrOverrideModelNotFound is returned when an override key doesn't match any loaded model ErrOverrideModelNotFound = errors.New("no matching model found for override key") )
var ( // ErrUnknownModelType is returned when an unknown model type is encountered ErrUnknownModelType = errors.New("unknown model type") )
Functions ¶
func FormatModelID ¶ added in v0.0.49
FormatModelID creates a standardized model ID from database and table names (format: "database.table").
func ParseModelID ¶ added in v0.0.49
ParseModelID splits a model ID into database and table components. Returns an error if the format is invalid.
Types ¶
type CacheOverride ¶ added in v0.0.24
type CacheOverride struct {
IncrementalScanInterval *time.Duration `yaml:"incremental_scan_interval,omitempty"`
FullScanInterval *time.Duration `yaml:"full_scan_interval,omitempty"`
}
CacheOverride allows overriding cache configuration
type Config ¶
type Config struct {
External ExternalConfig `yaml:"external"`
Transformation TransformationConfig `yaml:"transformations"`
Overrides map[string]*ModelOverride `yaml:"overrides,omitempty"`
Env map[string]string `yaml:"env,omitempty"`
}
Config represents the complete coordinator configuration
type DAGReader ¶ added in v0.0.2
type DAGReader interface {
// GetNode retrieves a node by its ID
GetNode(id string) (Node, error)
// GetTransformationNode retrieves a transformation node by its ID
GetTransformationNode(id string) (Transformation, error)
// GetExternalNode retrieves an external node by its ID
GetExternalNode(id string) (External, error)
// GetDependencies returns direct dependencies of a node (flattened)
GetDependencies(id string) []string
// GetStructuredDependencies returns direct dependencies preserving OR groups
GetStructuredDependencies(id string) []transformation.Dependency
// GetDependents returns nodes that depend on the given node
GetDependents(id string) []string
// GetAllDependencies returns all transitive dependencies
GetAllDependencies(id string) []string
// GetAllDependents returns all transitive dependents
GetAllDependents(id string) []string
// GetTransformationNodes returns all transformation nodes
GetTransformationNodes() []Transformation
// GetExternalNodes returns all external nodes
GetExternalNodes() []Node
// IsPathBetween checks if there's a path between two nodes
IsPathBetween(from, to string) bool
}
DAGReader provides read-only access to the dependency graph (ethPandaOps pattern)
type DependencyGraph ¶
type DependencyGraph struct {
// contains filtered or unexported fields
}
DependencyGraph manages the dependency graph for models
func NewDependencyGraph ¶
func NewDependencyGraph() *DependencyGraph
NewDependencyGraph creates a new dependency graph
func (*DependencyGraph) AddExternalModels ¶
func (d *DependencyGraph) AddExternalModels(models []External) error
AddExternalModels adds external models to the dependency graph
func (*DependencyGraph) AddTransformationEdges ¶
func (d *DependencyGraph) AddTransformationEdges(models []Transformation) error
AddTransformationEdges adds edges between transformation models based on dependencies
func (*DependencyGraph) AddTransformationModels ¶
func (d *DependencyGraph) AddTransformationModels(models []Transformation) error
AddTransformationModels adds transformation models to the dependency graph
func (*DependencyGraph) BuildGraph ¶
func (d *DependencyGraph) BuildGraph(transformationModels []Transformation, externalModels []External) error
BuildGraph builds the dependency graph from model configurations
func (*DependencyGraph) GetAllDependencies ¶
func (d *DependencyGraph) GetAllDependencies(modelID string) []string
GetAllDependencies returns all dependencies (recursive) of a model
func (*DependencyGraph) GetAllDependents ¶
func (d *DependencyGraph) GetAllDependents(modelID string) []string
GetAllDependents returns all dependents (recursive) of a model
func (*DependencyGraph) GetDependencies ¶
func (d *DependencyGraph) GetDependencies(modelID string) []string
GetDependencies returns the direct dependencies of a model Note: This returns all dependencies flattened (including those in OR groups)
func (*DependencyGraph) GetDependents ¶
func (d *DependencyGraph) GetDependents(modelID string) []string
GetDependents returns the direct dependents of a model
func (*DependencyGraph) GetExternalNode ¶
func (d *DependencyGraph) GetExternalNode(modelID string) (External, error)
GetExternalNode retrieves an external model node from the dependency graph
func (*DependencyGraph) GetExternalNodes ¶ added in v0.0.2
func (d *DependencyGraph) GetExternalNodes() []Node
GetExternalNodes returns all external nodes from the dependency graph
func (*DependencyGraph) GetNode ¶
func (d *DependencyGraph) GetNode(modelID string) (Node, error)
GetNode retrieves a node from the dependency graph by model ID
func (*DependencyGraph) GetStructuredDependencies ¶ added in v0.0.26
func (d *DependencyGraph) GetStructuredDependencies(modelID string) []transformation.Dependency
GetStructuredDependencies returns the direct dependencies preserving OR groups
func (*DependencyGraph) GetTransformationNode ¶
func (d *DependencyGraph) GetTransformationNode(modelID string) (Transformation, error)
GetTransformationNode retrieves a transformation model node from the dependency graph
func (*DependencyGraph) GetTransformationNodes ¶
func (d *DependencyGraph) GetTransformationNodes() []Transformation
GetTransformationNodes returns all transformation nodes from the dependency graph
func (*DependencyGraph) IsPathBetween ¶
func (d *DependencyGraph) IsPathBetween(fromModelID, toModelID string) bool
IsPathBetween checks if there's a path from one model to another
type External ¶
type External interface {
GetType() string
GetID() string
GetConfig() external.Config
GetConfigMutable() *external.Config
GetValue() string
SetDefaults(defaultCluster, defaultDB string)
}
External defines the interface for external models
type ExternalConfig ¶ added in v0.0.8
type ExternalConfig struct {
Paths []string `yaml:"paths"`
DefaultCluster string `yaml:"defaultCluster"`
DefaultDatabase string `yaml:"defaultDatabase"`
}
ExternalConfig defines configuration for external models
func (*ExternalConfig) SetDefaults ¶ added in v0.0.8
func (c *ExternalConfig) SetDefaults()
SetDefaults sets default paths for external models
type ExternalOverride ¶ added in v0.0.24
type ExternalOverride struct {
Lag *uint64 `yaml:"lag,omitempty"`
Cache *CacheOverride `yaml:"cache,omitempty"`
}
ExternalOverride contains override values for external model configurations All fields are optional - nil means keep existing value
type ExternalType ¶
type ExternalType string
ExternalType represents the type of an external model
const ( // ExternalTypeSQL represents SQL external model type ExternalTypeSQL ExternalType = external.ExternalTypeSQL )
type IntervalOverride ¶ added in v0.0.14
type IntervalOverride struct {
Max *uint64 `yaml:"max,omitempty"`
Min *uint64 `yaml:"min,omitempty"`
}
IntervalOverride allows overriding interval configuration
type LimitsOverride ¶ added in v0.0.14
type LimitsOverride struct {
Min *uint64 `yaml:"min,omitempty"`
Max *uint64 `yaml:"max,omitempty"`
}
LimitsOverride allows overriding position limits
type ModelFile ¶
ModelFile represents a discovered model file with its content
func DiscoverPaths ¶
DiscoverPaths walks directories to find model files (.sql, .yaml, .yml)
type ModelOverride ¶ added in v0.0.14
type ModelOverride struct {
Enabled *bool
Config OverrideConfig
// contains filtered or unexported fields
}
ModelOverride represents configuration overrides for a specific model The actual config is stored as raw YAML and unmarshaled based on model type
func (*ModelOverride) ApplyToExternal ¶ added in v0.0.24
func (m *ModelOverride) ApplyToExternal(config *external.Config)
ApplyToExternal applies override configuration to an external model
func (*ModelOverride) ApplyToTransformation ¶ added in v0.0.14
func (m *ModelOverride) ApplyToTransformation(model Transformation)
ApplyToTransformation applies override configuration to a transformation model
func (*ModelOverride) IsDisabled ¶ added in v0.0.14
func (m *ModelOverride) IsDisabled() bool
IsDisabled returns true if the model is explicitly disabled
func (*ModelOverride) ResolveConfig ¶ added in v0.0.24
func (m *ModelOverride) ResolveConfig(actualType ModelType) error
ResolveConfig unmarshals the raw config based on the actual model type
func (*ModelOverride) UnmarshalYAML ¶ added in v0.0.24
func (m *ModelOverride) UnmarshalYAML(node *yaml.Node) error
UnmarshalYAML implements custom unmarshaling for ModelOverride
type ModelType ¶ added in v0.0.24
type ModelType string
ModelType identifies whether a model is external or transformation
type Node ¶
type Node struct {
NodeType NodeType
Model interface{}
}
Node represents a node in the dependency graph
type OverrideConfig ¶ added in v0.0.24
type OverrideConfig interface {
// contains filtered or unexported methods
}
OverrideConfig defines the interface for override configurations
type SchedulesOverride ¶ added in v0.0.14
type SchedulesOverride struct {
ForwardFill *string `yaml:"forwardfill"`
Backfill *string `yaml:"backfill"`
}
SchedulesOverride allows overriding schedule configuration nil = keep existing, empty string = disable, non-empty = new schedule
type Service ¶
type Service interface {
// Lifecycle methods
Start() error
Stop() error
// DAG operations
GetDAG() DAGReader
// Rendering operations
RenderTransformation(model Transformation, position, interval uint64, startTime time.Time) (string, error)
RenderExternal(model External, cacheState map[string]interface{}) (string, error)
GetTransformationEnvironmentVariables(model Transformation, position, interval uint64, startTime time.Time) (*[]string, error)
}
Service defines the interface for the models service (ethPandaOps pattern)
func NewService ¶
func NewService(log logrus.FieldLogger, cfg *Config, clickhouseCfg *clickhouse.Config) (Service, error)
NewService creates a new worker application
type TemplateEngine ¶
type TemplateEngine struct {
// contains filtered or unexported fields
}
TemplateEngine provides template rendering with Sprig functions
func NewTemplateEngine ¶
func NewTemplateEngine(clickhouseCfg *clickhouse.Config, dag *DependencyGraph, globalEnv map[string]string) *TemplateEngine
NewTemplateEngine creates a new template engine for rendering models
func (*TemplateEngine) GetTransformationEnvironmentVariables ¶
func (t *TemplateEngine) GetTransformationEnvironmentVariables(model Transformation, position, interval uint64, startTime time.Time) (*[]string, error)
GetTransformationEnvironmentVariables builds environment variables for transformation execution
func (*TemplateEngine) RenderExternal ¶
func (t *TemplateEngine) RenderExternal(model External, cacheState map[string]interface{}) (string, error)
RenderExternal renders an external model template with variables
func (*TemplateEngine) RenderTransformation ¶
func (t *TemplateEngine) RenderTransformation(model Transformation, position, interval uint64, startTime time.Time) (string, error)
RenderTransformation renders a transformation model template with variables
type Transformation ¶
type Transformation interface {
GetType() string
GetID() string
GetConfig() *transformation.Config
GetHandler() transformation.Handler
GetValue() string
SetDefaultDatabase(defaultDB string)
}
Transformation defines the interface for transformation models
func NewTransformation ¶
func NewTransformation(content []byte, filePath string) (Transformation, error)
NewTransformation creates a new transformation model from file content
type TransformationConfig ¶ added in v0.0.8
type TransformationConfig struct {
Paths []string `yaml:"paths"`
DefaultDatabase string `yaml:"defaultDatabase"`
}
TransformationConfig defines configuration for transformation models
func (*TransformationConfig) SetDefaults ¶ added in v0.0.8
func (c *TransformationConfig) SetDefaults()
SetDefaults sets default paths for transformation models
type TransformationOverride ¶ added in v0.0.14
type TransformationOverride struct {
// Incremental transformation fields
Interval *IntervalOverride `yaml:"interval,omitempty"`
Schedules *SchedulesOverride `yaml:"schedules,omitempty"`
Limits *LimitsOverride `yaml:"limits,omitempty"`
// Scheduled transformation fields
Schedule *string `yaml:"schedule,omitempty"`
// Common fields (both types)
Tags []string `yaml:"tags,omitempty"`
}
TransformationOverride contains override values for transformation model configurations All fields are optional - nil means keep existing value The structure is type-agnostic - fields are applied based on the actual transformation type: - Incremental transformations: interval, schedules (forwardfill/backfill), limits, tags - Scheduled transformations: schedule, tags
type TransformationType ¶
type TransformationType string
TransformationType defines the type of transformation model
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package external provides external model configuration and validation
|
Package external provides external model configuration and validation |
|
Package modelid provides utilities for model identification.
|
Package modelid provides utilities for model identification. |
|
Package transformation provides transformation model configuration and validation
|
Package transformation provides transformation model configuration and validation |
|
incremental
Package incremental provides the incremental transformation type handler
|
Package incremental provides the incremental transformation type handler |
|
mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
|
scheduled
Package scheduled provides the scheduled transformation type handler
|
Package scheduled provides the scheduled transformation type handler |