tasks

package
v0.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

Package tasks provides task handling and execution functionality

Index

Constants

View Source
const (
	// DirectionForward represents forward fill processing
	DirectionForward = "forward"
	// DirectionBack represents backfill processing
	DirectionBack = "back"
)
View Source
const (
	// TypeModelTransformation is the task type for model transformations
	TypeModelTransformation = "model:transformation"
)

Variables

View Source
var (
	// ErrModelConfigNotFound is returned when model configuration is not found
	ErrModelConfigNotFound = errors.New("model configuration not found")
	// ErrDependenciesNotSatisfied is returned when dependencies are not satisfied
	ErrDependenciesNotSatisfied = errors.New("dependencies not satisfied")
	// ErrModelIDNotFound is returned when model_id is not found in payload
	ErrModelIDNotFound = errors.New("model_id not found in payload")
	// ErrScanTypeNotFound is returned when scan_type is not found in payload
	ErrScanTypeNotFound = errors.New("scan_type not found in payload")
	// ErrCacheManagerUnavailable is returned when cache manager is not available
	ErrCacheManagerUnavailable = errors.New("cache manager not available")
)

Functions

This section is empty.

Types

type Executor

type Executor interface {
	Execute(ctx context.Context, taskCtx interface{}) error
	Validate(ctx context.Context, taskCtx interface{}) error
	UpdateBounds(ctx context.Context, modelID, scanType string) error
}

Executor defines the interface for task executors

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager manages task queuing

func NewQueueManager

func NewQueueManager(redisOpt *asynq.RedisClientOpt) *QueueManager

NewQueueManager creates a new queue manager

func (*QueueManager) Close

func (q *QueueManager) Close() error

Close closes the queue manager

func (*QueueManager) Enqueue added in v0.0.2

func (q *QueueManager) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)

Enqueue enqueues a generic task

func (*QueueManager) EnqueueTransformation

func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error

EnqueueTransformation enqueues a transformation task

func (*QueueManager) IsTaskPendingOrRunning

func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)

IsTaskPendingOrRunning checks if a task is pending or running

type TaskContext

type TaskContext struct {
	Transformation models.Transformation
	Position       uint64
	Interval       uint64
	StartTime      time.Time
	Variables      map[string]interface{}
}

TaskContext contains all context needed for task execution

type TaskHandler

type TaskHandler struct {
	// contains filtered or unexported fields
}

TaskHandler handles task execution

func NewTaskHandler

func NewTaskHandler(
	logger logrus.FieldLogger,
	chClient clickhouse.ClientInterface,
	adminService admin.Service,
	validator validation.Validator,
	modelExecutor Executor,
	transformations []models.Transformation,
) *TaskHandler

NewTaskHandler creates a new task handler

func (*TaskHandler) HandleExternalScan added in v0.0.18

func (h *TaskHandler) HandleExternalScan(ctx context.Context, t *asynq.Task) error

HandleExternalScan handles external model scan tasks (both incremental and full)

func (*TaskHandler) HandleTransformation

func (h *TaskHandler) HandleTransformation(ctx context.Context, t *asynq.Task) error

HandleTransformation handles transformation tasks

func (*TaskHandler) Routes

func (h *TaskHandler) Routes() map[string]asynq.HandlerFunc

Routes returns the task handler routes for Asynq

type TaskPayload

type TaskPayload struct {
	ModelID    string    `json:"model_id"`
	Position   uint64    `json:"position"`
	Interval   uint64    `json:"interval"`
	Direction  string    `json:"direction"` // DirectionForward or DirectionBack
	EnqueuedAt time.Time `json:"enqueued_at"`
}

TaskPayload represents the payload for a transformation task

func (TaskPayload) QueueName

func (p TaskPayload) QueueName() string

QueueName returns the queue name for this task payload

func (TaskPayload) UniqueID

func (p TaskPayload) UniqueID() string

UniqueID returns a unique identifier for this task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL