incremental

package
v0.0.31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2025 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package incremental provides the incremental transformation type handler

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrIntervalRequired is returned when interval configuration is missing
	ErrIntervalRequired = errors.New("interval configuration is required")
	// ErrNoSchedulesConfig is returned when no schedules are configured
	ErrNoSchedulesConfig = errors.New("at least one schedule must be configured")
	// ErrDependenciesRequired is returned when dependencies are not specified
	ErrDependenciesRequired = errors.New("dependencies are required for incremental transformations")
	// ErrAdminServiceInvalid is returned when admin service doesn't implement required interface
	ErrAdminServiceInvalid = errors.New("admin service does not implement RecordCompletion")
	// ErrIntervalMaxRequired is returned when interval.max is not specified
	ErrIntervalMaxRequired = errors.New("interval.max is required")
	// ErrInvalidInterval is returned when interval.min exceeds interval.max
	ErrInvalidInterval = errors.New("interval.min cannot exceed interval.max")
	// ErrInvalidLimits is returned when min limit is greater than max limit
	ErrInvalidLimits = errors.New("min limit cannot be greater than max limit")
	// ErrIntervalTypeRequired is returned when interval.type is not specified
	ErrIntervalTypeRequired = errors.New("interval.type is required")
)

Functions

func Register

func Register()

Register registers the incremental handler with the global registry

func ValidateScheduleFormat

func ValidateScheduleFormat(schedule string) error

ValidateScheduleFormat validates a cron schedule expression

Types

type Config

type Config struct {
	Type         transformation.Type         `yaml:"type"`
	Database     string                      `yaml:"database"`
	Table        string                      `yaml:"table"`
	Limits       *LimitsConfig               `yaml:"limits,omitempty"`
	Interval     *IntervalConfig             `yaml:"interval"`
	Schedules    *SchedulesConfig            `yaml:"schedules"`
	Dependencies []transformation.Dependency `yaml:"dependencies"`
	Tags         []string                    `yaml:"tags,omitempty"`
	Exec         string                      `yaml:"exec,omitempty"`
	Env          map[string]string           `yaml:"env,omitempty"`
	SQL          string                      `yaml:"-"` // SQL content from separate file

	// OriginalDependencies stores the dependencies before placeholder substitution
	OriginalDependencies []transformation.Dependency `yaml:"-"`
}

Config defines the configuration for incremental transformation models

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler handles incremental transformation type operations

func NewHandler

func NewHandler(data []byte, adminTable transformation.AdminTable) (*Handler, error)

NewHandler creates a new handler for incremental transformations

func (*Handler) AllowsPartialIntervals

func (h *Handler) AllowsPartialIntervals() bool

AllowsPartialIntervals returns true if min interval is 0 (allows partial processing)

func (*Handler) ApplyOverrides added in v0.0.27

func (h *Handler) ApplyOverrides(override interface{})

ApplyOverrides applies configuration overrides to this incremental transformation handler Uses reflection to avoid circular dependency with models package

func (*Handler) Config

func (h *Handler) Config() any

Config returns the typed configuration

func (*Handler) GetAdminTable

func (h *Handler) GetAdminTable() transformation.AdminTable

GetAdminTable returns the admin table configuration

func (*Handler) GetDependencies

func (h *Handler) GetDependencies() []transformation.Dependency

GetDependencies returns the dependencies (after placeholder substitution)

func (*Handler) GetFlatDependencies

func (h *Handler) GetFlatDependencies() []string

GetFlatDependencies returns dependencies as string slice (API handler interface)

func (*Handler) GetFlattenedDependencies

func (h *Handler) GetFlattenedDependencies() []string

GetFlattenedDependencies returns all dependencies as a flat string array

func (*Handler) GetID

func (h *Handler) GetID() string

GetID returns the unique identifier for the transformation model

func (*Handler) GetInterval

func (h *Handler) GetInterval() (minInterval, maxInterval uint64)

GetInterval returns the min and max interval sizes (API handler interface)

func (*Handler) GetIntervalType

func (h *Handler) GetIntervalType() string

GetIntervalType returns the interval type for this incremental transformation

func (*Handler) GetLimits

func (h *Handler) GetLimits() *struct{ Min, Max uint64 }

GetLimits returns the position limits configuration

func (*Handler) GetMaxInterval

func (h *Handler) GetMaxInterval() uint64

GetMaxInterval returns the maximum interval size

func (*Handler) GetMinInterval

func (h *Handler) GetMinInterval() uint64

GetMinInterval returns the minimum interval size

func (*Handler) GetOriginalDependencies

func (h *Handler) GetOriginalDependencies() []transformation.Dependency

GetOriginalDependencies returns the original dependencies before placeholder substitution

func (*Handler) GetSchedules

func (h *Handler) GetSchedules() (forwardfill, backfill string)

GetSchedules returns the forwardfill and backfill schedules (API handler interface)

func (*Handler) GetTags

func (h *Handler) GetTags() []string

GetTags returns the tags for this transformation (API handler interface)

func (*Handler) GetTemplateVariables

func (h *Handler) GetTemplateVariables(_ context.Context, taskInfo transformation.TaskInfo) map[string]any

GetTemplateVariables returns template variables for incremental transformations

func (*Handler) IsBackfillEnabled

func (h *Handler) IsBackfillEnabled() bool

IsBackfillEnabled returns true if backfill schedule is configured

func (*Handler) IsForwardFillEnabled

func (h *Handler) IsForwardFillEnabled() bool

IsForwardFillEnabled returns true if forward fill schedule is configured

func (*Handler) RecordCompletion

func (h *Handler) RecordCompletion(ctx context.Context, adminService any, modelID string, taskInfo transformation.TaskInfo) error

RecordCompletion records the completion of an incremental transformation

func (*Handler) ShouldTrackPosition

func (h *Handler) ShouldTrackPosition() bool

ShouldTrackPosition returns true for incremental transformations

func (*Handler) SubstituteDependencyPlaceholders

func (h *Handler) SubstituteDependencyPlaceholders(externalDefaultDB, transformationDefaultDB string)

SubstituteDependencyPlaceholders replaces {{external}} and {{transformation}} placeholders

func (*Handler) Type

func (h *Handler) Type() transformation.Type

Type returns the transformation type (incremental)

func (*Handler) Validate

func (h *Handler) Validate() error

Validate validates the configuration for incremental transformations

type IntervalConfig

type IntervalConfig struct {
	Max  uint64 `yaml:"max"`              // Maximum interval size for processing
	Min  uint64 `yaml:"min"`              // Minimum interval size (0 = allow any partial size)
	Type string `yaml:"type" json:"type"` // Required: examples: "second", "slot", "epoch", "block"
}

IntervalConfig defines interval configuration for transformations

func (*IntervalConfig) Validate

func (c *IntervalConfig) Validate() error

Validate checks if the interval configuration is valid

type LimitsConfig

type LimitsConfig struct {
	Min uint64 `yaml:"min,omitempty"`
	Max uint64 `yaml:"max,omitempty"`
}

LimitsConfig defines position limits for transformations

func (*LimitsConfig) Validate

func (c *LimitsConfig) Validate() error

Validate checks if the limits configuration is valid

type SchedulesConfig

type SchedulesConfig struct {
	ForwardFill string `yaml:"forwardfill,omitempty"` // Forward fill schedule (optional)
	Backfill    string `yaml:"backfill,omitempty"`    // Backfill schedule (optional)
}

SchedulesConfig defines scheduling configuration for transformations

func (*SchedulesConfig) Validate

func (c *SchedulesConfig) Validate() error

Validate checks if the schedules configuration is valid

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL