tasks

package
v0.0.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2025 License: GPL-3.0 Imports: 15 Imported by: 0

Documentation

Overview

Package tasks provides task handling and execution functionality

Index

Constants

View Source
const (
	// DirectionForward represents forward fill processing
	DirectionForward = "forward"
	// DirectionBack represents backfill processing
	DirectionBack = "back"
)
View Source
const (
	// TypeModelTransformation is the task type for model transformations
	TypeModelTransformation = "model:transformation"
)

Variables

View Source
var (
	// ErrModelConfigNotFound is returned when model configuration is not found
	ErrModelConfigNotFound = errors.New("model configuration not found")
	// ErrDependenciesNotSatisfied is returned when dependencies are not satisfied
	ErrDependenciesNotSatisfied = errors.New("dependencies not satisfied")
	// ErrModelIDNotFound is returned when model_id is not found in payload
	ErrModelIDNotFound = errors.New("model_id not found in payload")
	// ErrScanTypeNotFound is returned when scan_type is not found in payload
	ErrScanTypeNotFound = errors.New("scan_type not found in payload")
	// ErrCacheManagerUnavailable is returned when cache manager is not available
	ErrCacheManagerUnavailable = errors.New("cache manager not available")
)

Functions

This section is empty.

Types

type Executor

type Executor interface {
	Execute(ctx context.Context, taskCtx interface{}) error
	Validate(ctx context.Context, taskCtx interface{}) error
	UpdateBounds(ctx context.Context, modelID, scanType string) error
}

Executor defines the interface for task executors

type IncrementalTaskPayload added in v0.0.25

type IncrementalTaskPayload struct {
	ModelID    string    `json:"model_id"`
	Position   uint64    `json:"position"`
	Interval   uint64    `json:"interval"`
	Direction  string    `json:"direction"` // DirectionForward or DirectionBack
	EnqueuedAt time.Time `json:"enqueued_at"`
}

IncrementalTaskPayload represents a position-based incremental task

func (IncrementalTaskPayload) GetEnqueuedAt added in v0.0.25

func (p IncrementalTaskPayload) GetEnqueuedAt() time.Time

GetEnqueuedAt returns the enqueued time

func (IncrementalTaskPayload) GetModelID added in v0.0.25

func (p IncrementalTaskPayload) GetModelID() string

GetModelID returns the model ID

func (IncrementalTaskPayload) GetType added in v0.0.25

func (p IncrementalTaskPayload) GetType() TaskType

GetType returns the task type

func (IncrementalTaskPayload) QueueName added in v0.0.25

func (p IncrementalTaskPayload) QueueName() string

QueueName returns the queue name for this task

func (IncrementalTaskPayload) UniqueID added in v0.0.25

func (p IncrementalTaskPayload) UniqueID() string

UniqueID returns a unique identifier for this task

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager manages task queuing

func NewQueueManager

func NewQueueManager(redisOpt *asynq.RedisClientOpt) *QueueManager

NewQueueManager creates a new queue manager

func (*QueueManager) Close

func (q *QueueManager) Close() error

Close closes the queue manager

func (*QueueManager) Enqueue added in v0.0.2

func (q *QueueManager) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)

Enqueue enqueues a generic task

func (*QueueManager) EnqueueTransformation

func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error

EnqueueTransformation enqueues a transformation task

func (*QueueManager) IsTaskPendingOrRunning

func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)

IsTaskPendingOrRunning checks if a task is pending or running

type ScheduledTaskPayload added in v0.0.25

type ScheduledTaskPayload struct {
	ModelID       string    `json:"model_id"`
	ExecutionTime time.Time `json:"execution_time"`
	EnqueuedAt    time.Time `json:"enqueued_at"`
}

ScheduledTaskPayload represents a scheduled cron-based task

func (ScheduledTaskPayload) GetEnqueuedAt added in v0.0.25

func (p ScheduledTaskPayload) GetEnqueuedAt() time.Time

GetEnqueuedAt returns the enqueued time

func (ScheduledTaskPayload) GetModelID added in v0.0.25

func (p ScheduledTaskPayload) GetModelID() string

GetModelID returns the model ID

func (ScheduledTaskPayload) GetType added in v0.0.25

func (p ScheduledTaskPayload) GetType() TaskType

GetType returns the task type

func (ScheduledTaskPayload) QueueName added in v0.0.25

func (p ScheduledTaskPayload) QueueName() string

QueueName returns the queue name for this task

func (ScheduledTaskPayload) UniqueID added in v0.0.25

func (p ScheduledTaskPayload) UniqueID() string

UniqueID returns a unique identifier for this task

type TaskContext

type TaskContext struct {
	Transformation models.Transformation
	Position       uint64
	Interval       uint64
	StartTime      time.Time
	Variables      map[string]interface{}
}

TaskContext contains all context needed for task execution

type TaskHandler

type TaskHandler struct {
	// contains filtered or unexported fields
}

TaskHandler handles task execution

func NewTaskHandler

func NewTaskHandler(
	logger logrus.FieldLogger,
	chClient clickhouse.ClientInterface,
	adminService admin.Service,
	validator validation.Validator,
	modelExecutor Executor,
	transformations []models.Transformation,
) *TaskHandler

NewTaskHandler creates a new task handler

func (*TaskHandler) HandleExternalScan added in v0.0.18

func (h *TaskHandler) HandleExternalScan(ctx context.Context, t *asynq.Task) error

HandleExternalScan handles external model scan tasks (both incremental and full)

func (*TaskHandler) HandleTransformation

func (h *TaskHandler) HandleTransformation(ctx context.Context, t *asynq.Task) error

HandleTransformation handles transformation tasks

func (*TaskHandler) Routes

func (h *TaskHandler) Routes() map[string]asynq.HandlerFunc

Routes returns the task handler routes for Asynq

type TaskPayload

type TaskPayload interface {
	GetModelID() string
	GetEnqueuedAt() time.Time
	GetType() TaskType
	UniqueID() string
	QueueName() string
}

TaskPayload is the common interface for all task payloads

type TaskType added in v0.0.25

type TaskType string

TaskType indicates whether this is a scheduled or incremental task

const (
	// TaskTypeIncremental represents incremental position-based tasks
	TaskTypeIncremental TaskType = "incremental"
	// TaskTypeScheduled represents scheduled cron-based tasks
	TaskTypeScheduled TaskType = "scheduled"
	// TaskTypeExternal represents external model scan tasks
	TaskTypeExternal TaskType = "external"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL