Documentation
¶
Overview ¶
Package incremental provides the incremental transformation type handler
Index ¶
- Variables
- func Register()
- func ValidateScheduleFormat(schedule string) error
- type Config
- type Handler
- func (h *Handler) AllowsPartialIntervals() bool
- func (h *Handler) Config() any
- func (h *Handler) GetAdminTable() transformation.AdminTable
- func (h *Handler) GetDependencies() []transformation.Dependency
- func (h *Handler) GetFlatDependencies() []string
- func (h *Handler) GetFlattenedDependencies() []string
- func (h *Handler) GetID() string
- func (h *Handler) GetInterval() (minInterval, maxInterval uint64)
- func (h *Handler) GetIntervalType() string
- func (h *Handler) GetLimits() *struct{ Min, Max uint64 }
- func (h *Handler) GetMaxInterval() uint64
- func (h *Handler) GetMinInterval() uint64
- func (h *Handler) GetOriginalDependencies() []transformation.Dependency
- func (h *Handler) GetSchedules() (forwardfill, backfill string)
- func (h *Handler) GetTags() []string
- func (h *Handler) GetTemplateVariables(_ context.Context, taskInfo transformation.TaskInfo) map[string]any
- func (h *Handler) IsBackfillEnabled() bool
- func (h *Handler) IsForwardFillEnabled() bool
- func (h *Handler) RecordCompletion(ctx context.Context, adminService any, modelID string, ...) error
- func (h *Handler) ShouldTrackPosition() bool
- func (h *Handler) SubstituteDependencyPlaceholders(externalDefaultDB, transformationDefaultDB string)
- func (h *Handler) Type() transformation.Type
- func (h *Handler) Validate() error
- type IntervalConfig
- type LimitsConfig
- type SchedulesConfig
Constants ¶
This section is empty.
Variables ¶
var ( // ErrIntervalRequired is returned when interval configuration is missing ErrIntervalRequired = errors.New("interval configuration is required") // ErrNoSchedulesConfig is returned when no schedules are configured ErrNoSchedulesConfig = errors.New("at least one schedule must be configured") // ErrDependenciesRequired is returned when dependencies are not specified ErrDependenciesRequired = errors.New("dependencies are required for incremental transformations") // ErrAdminServiceInvalid is returned when admin service doesn't implement required interface ErrAdminServiceInvalid = errors.New("admin service does not implement RecordCompletion") // ErrIntervalMaxRequired is returned when interval.max is not specified ErrIntervalMaxRequired = errors.New("interval.max is required") // ErrInvalidInterval is returned when interval.min exceeds interval.max ErrInvalidInterval = errors.New("interval.min cannot exceed interval.max") // ErrInvalidLimits is returned when min limit is greater than max limit ErrInvalidLimits = errors.New("min limit cannot be greater than max limit") // ErrIntervalTypeRequired is returned when interval.type is not specified ErrIntervalTypeRequired = errors.New("interval.type is required") )
Functions ¶
func ValidateScheduleFormat ¶
ValidateScheduleFormat validates a cron schedule expression
Types ¶
type Config ¶
type Config struct {
Type transformation.Type `yaml:"type"`
Database string `yaml:"database"`
Table string `yaml:"table"`
Limits *LimitsConfig `yaml:"limits,omitempty"`
Interval *IntervalConfig `yaml:"interval"`
Schedules *SchedulesConfig `yaml:"schedules"`
Dependencies []transformation.Dependency `yaml:"dependencies"`
Tags []string `yaml:"tags,omitempty"`
Exec string `yaml:"exec,omitempty"`
Env map[string]string `yaml:"env,omitempty"`
SQL string `yaml:"-"` // SQL content from separate file
// OriginalDependencies stores the dependencies before placeholder substitution
OriginalDependencies []transformation.Dependency `yaml:"-"`
}
Config defines the configuration for incremental transformation models
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler handles incremental transformation type operations
func NewHandler ¶
func NewHandler(data []byte, adminTable transformation.AdminTable) (*Handler, error)
NewHandler creates a new handler for incremental transformations
func (*Handler) AllowsPartialIntervals ¶
AllowsPartialIntervals returns true if min interval is 0 (allows partial processing)
func (*Handler) GetAdminTable ¶
func (h *Handler) GetAdminTable() transformation.AdminTable
GetAdminTable returns the admin table configuration
func (*Handler) GetDependencies ¶
func (h *Handler) GetDependencies() []transformation.Dependency
GetDependencies returns the dependencies (after placeholder substitution)
func (*Handler) GetFlatDependencies ¶
GetFlatDependencies returns dependencies as string slice (API handler interface)
func (*Handler) GetFlattenedDependencies ¶
GetFlattenedDependencies returns all dependencies as a flat string array
func (*Handler) GetInterval ¶
GetInterval returns the min and max interval sizes (API handler interface)
func (*Handler) GetIntervalType ¶
GetIntervalType returns the interval type for this incremental transformation
func (*Handler) GetMaxInterval ¶
GetMaxInterval returns the maximum interval size
func (*Handler) GetMinInterval ¶
GetMinInterval returns the minimum interval size
func (*Handler) GetOriginalDependencies ¶
func (h *Handler) GetOriginalDependencies() []transformation.Dependency
GetOriginalDependencies returns the original dependencies before placeholder substitution
func (*Handler) GetSchedules ¶
GetSchedules returns the forwardfill and backfill schedules (API handler interface)
func (*Handler) GetTemplateVariables ¶
func (h *Handler) GetTemplateVariables(_ context.Context, taskInfo transformation.TaskInfo) map[string]any
GetTemplateVariables returns template variables for incremental transformations
func (*Handler) IsBackfillEnabled ¶
IsBackfillEnabled returns true if backfill schedule is configured
func (*Handler) IsForwardFillEnabled ¶
IsForwardFillEnabled returns true if forward fill schedule is configured
func (*Handler) RecordCompletion ¶
func (h *Handler) RecordCompletion(ctx context.Context, adminService any, modelID string, taskInfo transformation.TaskInfo) error
RecordCompletion records the completion of an incremental transformation
func (*Handler) ShouldTrackPosition ¶
ShouldTrackPosition returns true for incremental transformations
func (*Handler) SubstituteDependencyPlaceholders ¶
func (h *Handler) SubstituteDependencyPlaceholders(externalDefaultDB, transformationDefaultDB string)
SubstituteDependencyPlaceholders replaces {{external}} and {{transformation}} placeholders
func (*Handler) Type ¶
func (h *Handler) Type() transformation.Type
Type returns the transformation type (incremental)
type IntervalConfig ¶
type IntervalConfig struct {
Max uint64 `yaml:"max"` // Maximum interval size for processing
Min uint64 `yaml:"min"` // Minimum interval size (0 = allow any partial size)
Type string `yaml:"type" json:"type"` // Required: examples: "second", "slot", "epoch", "block"
}
IntervalConfig defines interval configuration for transformations
func (*IntervalConfig) Validate ¶
func (c *IntervalConfig) Validate() error
Validate checks if the interval configuration is valid
type LimitsConfig ¶
LimitsConfig defines position limits for transformations
func (*LimitsConfig) Validate ¶
func (c *LimitsConfig) Validate() error
Validate checks if the limits configuration is valid
type SchedulesConfig ¶
type SchedulesConfig struct {
ForwardFill string `yaml:"forwardfill,omitempty"` // Forward fill schedule (optional)
Backfill string `yaml:"backfill,omitempty"` // Backfill schedule (optional)
}
SchedulesConfig defines scheduling configuration for transformations
func (*SchedulesConfig) Validate ¶
func (c *SchedulesConfig) Validate() error
Validate checks if the schedules configuration is valid