Documentation
¶
Overview ¶
Package incremental provides the incremental transformation type handler
Index ¶
- Variables
- func Register()
- type Config
- type FillConfig
- type Handler
- func (h *Handler) AllowGapSkipping() bool
- func (h *Handler) AllowsPartialIntervals() bool
- func (h *Handler) ApplyOverrides(override interface{})
- func (h *Handler) Config() any
- func (h *Handler) GetAdminTable() transformation.AdminTable
- func (h *Handler) GetDependencies() []transformation.Dependency
- func (h *Handler) GetFillBuffer() uint64
- func (h *Handler) GetFillDirection() string
- func (h *Handler) GetFlatDependencies() []string
- func (h *Handler) GetFlattenedDependencies() []string
- func (h *Handler) GetID() string
- func (h *Handler) GetInterval() (minInterval, maxInterval uint64)
- func (h *Handler) GetIntervalType() string
- func (h *Handler) GetLimits() *transformation.Limits
- func (h *Handler) GetMaxInterval() uint64
- func (h *Handler) GetMinInterval() uint64
- func (h *Handler) GetOriginalDependencies() []transformation.Dependency
- func (h *Handler) GetSchedules() (forwardfill, backfill string)
- func (h *Handler) GetTags() []string
- func (h *Handler) GetTemplateVariables(_ context.Context, taskInfo transformation.TaskInfo) map[string]any
- func (h *Handler) IsBackfillEnabled() bool
- func (h *Handler) IsForwardFillEnabled() bool
- func (h *Handler) RecordCompletion(ctx context.Context, adminService any, modelID string, ...) error
- func (h *Handler) RestoreConfig(snapshot any)
- func (h *Handler) ShouldTrackPosition() bool
- func (h *Handler) SnapshotConfig() any
- func (h *Handler) SubstituteDependencyPlaceholders(externalDefaultDB, transformationDefaultDB string)
- func (h *Handler) Type() transformation.Type
- func (h *Handler) Validate() error
- type IntervalConfig
- type LimitsConfig
- type SchedulesConfig
- type Snapshot
Constants ¶
This section is empty.
Variables ¶
var ( // ErrIntervalRequired is returned when interval configuration is missing ErrIntervalRequired = errors.New("interval configuration is required") // ErrNoSchedulesConfig is returned when no schedules are configured ErrNoSchedulesConfig = errors.New("at least one schedule must be configured") // ErrDependenciesRequired is returned when dependencies are not specified ErrDependenciesRequired = errors.New("dependencies are required for incremental transformations") // ErrAdminServiceInvalid is returned when admin service doesn't implement required interface ErrAdminServiceInvalid = errors.New("admin service does not implement RecordCompletion") // ErrIntervalMaxRequired is returned when interval.max is not specified ErrIntervalMaxRequired = errors.New("interval.max is required") // ErrInvalidInterval is returned when interval.min exceeds interval.max ErrInvalidInterval = errors.New("interval.min cannot exceed interval.max") // ErrInvalidLimits is returned when min limit is greater than max limit ErrInvalidLimits = errors.New("min limit cannot be greater than max limit") // ErrIntervalTypeRequired is returned when interval.type is not specified ErrIntervalTypeRequired = errors.New("interval.type is required") // ErrInvalidFillDirection is returned when fill.direction is not 'head' or 'tail' ErrInvalidFillDirection = errors.New("fill.direction must be 'head' or 'tail'") )
Functions ¶
Types ¶
type Config ¶
type Config struct {
Type transformation.Type `yaml:"type"`
Database string `yaml:"database"`
Table string `yaml:"table"`
Limits *LimitsConfig `yaml:"limits,omitempty"`
Interval *IntervalConfig `yaml:"interval"`
Schedules *SchedulesConfig `yaml:"schedules"`
Fill *FillConfig `yaml:"fill,omitempty"`
Dependencies []transformation.Dependency `yaml:"dependencies"`
Tags []string `yaml:"tags,omitempty"`
Exec string `yaml:"exec,omitempty"`
Env map[string]string `yaml:"env,omitempty"`
SQL string `yaml:"-"` // SQL content from separate file
// OriginalDependencies stores the dependencies before placeholder substitution
OriginalDependencies []transformation.Dependency `yaml:"-"`
}
Config defines the configuration for incremental transformation models
type FillConfig ¶ added in v0.0.43
type FillConfig struct {
Direction string `yaml:"direction,omitempty"` // "head" or "tail" (default: "head")
AllowGapSkipping *bool `yaml:"allow_gap_skipping,omitempty"` // default: true
Buffer uint64 `yaml:"buffer,omitempty"` // Stay N positions behind dependency max (default: 0)
}
FillConfig defines how the transformation is initially filled and processed
func (*FillConfig) Validate ¶ added in v0.0.43
func (c *FillConfig) Validate() error
Validate checks if the fill configuration is valid
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler handles incremental transformation type operations
func NewHandler ¶
func NewHandler(data []byte, adminTable transformation.AdminTable) (*Handler, error)
NewHandler creates a new handler for incremental transformations
func (*Handler) AllowGapSkipping ¶ added in v0.0.43
AllowGapSkipping returns whether gap skipping is allowed during forward fill
func (*Handler) AllowsPartialIntervals ¶
AllowsPartialIntervals returns true if min interval is 0 (allows partial processing)
func (*Handler) ApplyOverrides ¶ added in v0.0.27
func (h *Handler) ApplyOverrides(override interface{})
ApplyOverrides applies configuration overrides to this incremental transformation handler Uses reflection to avoid circular dependency with models package
func (*Handler) GetAdminTable ¶
func (h *Handler) GetAdminTable() transformation.AdminTable
GetAdminTable returns the admin table configuration
func (*Handler) GetDependencies ¶
func (h *Handler) GetDependencies() []transformation.Dependency
GetDependencies returns the dependencies (after placeholder substitution)
func (*Handler) GetFillBuffer ¶ added in v0.0.43
GetFillBuffer returns the configured fill buffer (how far behind dependencies to stay)
func (*Handler) GetFillDirection ¶ added in v0.0.43
GetFillDirection returns the configured fill direction ("head" or "tail")
func (*Handler) GetFlatDependencies ¶
GetFlatDependencies returns dependencies as string slice (API handler interface)
func (*Handler) GetFlattenedDependencies ¶
GetFlattenedDependencies returns all dependencies as a flat string array
func (*Handler) GetInterval ¶
GetInterval returns the min and max interval sizes (API handler interface)
func (*Handler) GetIntervalType ¶
GetIntervalType returns the interval type for this incremental transformation
func (*Handler) GetLimits ¶
func (h *Handler) GetLimits() *transformation.Limits
GetLimits returns the position limits configuration
func (*Handler) GetMaxInterval ¶
GetMaxInterval returns the maximum interval size
func (*Handler) GetMinInterval ¶
GetMinInterval returns the minimum interval size
func (*Handler) GetOriginalDependencies ¶
func (h *Handler) GetOriginalDependencies() []transformation.Dependency
GetOriginalDependencies returns the original dependencies before placeholder substitution
func (*Handler) GetSchedules ¶
GetSchedules returns the forwardfill and backfill schedules (API handler interface)
func (*Handler) GetTemplateVariables ¶
func (h *Handler) GetTemplateVariables(_ context.Context, taskInfo transformation.TaskInfo) map[string]any
GetTemplateVariables returns template variables for incremental transformations
func (*Handler) IsBackfillEnabled ¶
IsBackfillEnabled returns true if backfill schedule is configured
func (*Handler) IsForwardFillEnabled ¶
IsForwardFillEnabled returns true if forward fill schedule is configured
func (*Handler) RecordCompletion ¶
func (h *Handler) RecordCompletion(ctx context.Context, adminService any, modelID string, taskInfo transformation.TaskInfo) error
RecordCompletion records the completion of an incremental transformation
func (*Handler) RestoreConfig ¶ added in v0.1.0
RestoreConfig restores overridable config values from a snapshot.
func (*Handler) ShouldTrackPosition ¶
ShouldTrackPosition returns true for incremental transformations
func (*Handler) SnapshotConfig ¶ added in v0.1.0
SnapshotConfig captures the current overridable config values.
func (*Handler) SubstituteDependencyPlaceholders ¶
func (h *Handler) SubstituteDependencyPlaceholders(externalDefaultDB, transformationDefaultDB string)
SubstituteDependencyPlaceholders replaces {{external}} and {{transformation}} placeholders
func (*Handler) Type ¶
func (h *Handler) Type() transformation.Type
Type returns the transformation type (incremental)
type IntervalConfig ¶
type IntervalConfig struct {
Max uint64 `yaml:"max"` // Maximum interval size for processing
Min uint64 `yaml:"min"` // Minimum interval size (0 = allow any partial size)
Type string `yaml:"type" json:"type"` // Required: examples: "second", "slot", "epoch", "block"
}
IntervalConfig defines interval configuration for transformations
func (*IntervalConfig) Validate ¶
func (c *IntervalConfig) Validate() error
Validate checks if the interval configuration is valid
type LimitsConfig ¶
LimitsConfig defines position limits for transformations
func (*LimitsConfig) Validate ¶
func (c *LimitsConfig) Validate() error
Validate checks if the limits configuration is valid
type SchedulesConfig ¶
type SchedulesConfig struct {
ForwardFill string `yaml:"forwardfill,omitempty"` // Forward fill schedule (optional)
Backfill string `yaml:"backfill,omitempty"` // Backfill schedule (optional)
}
SchedulesConfig defines scheduling configuration for transformations
func (*SchedulesConfig) Validate ¶
func (c *SchedulesConfig) Validate() error
Validate checks if the schedules configuration is valid
type Snapshot ¶ added in v0.1.0
type Snapshot struct {
IntervalMin uint64
IntervalMax uint64
ForwardFill string
Backfill string
HasLimits bool
LimitsMin uint64
LimitsMax uint64
Tags []string
}
Snapshot holds a point-in-time snapshot of overridable config fields.
func (*Snapshot) ToBaseConfigJSON ¶ added in v0.1.0
func (s *Snapshot) ToBaseConfigJSON() (json.RawMessage, error)
ToBaseConfigJSON serializes the snapshot to a JSON representation suitable for the management API's base_config response.