Documentation
¶
Overview ¶
Package tasks provides task handling and execution functionality
Index ¶
- Constants
- Variables
- type Executor
- type QueueManager
- func (q *QueueManager) Close() error
- func (q *QueueManager) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)
- func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error
- func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)
- type TaskContext
- type TaskHandler
- type TaskPayload
Constants ¶
const ( // DirectionForward represents forward fill processing DirectionForward = "forward" // DirectionBack represents backfill processing DirectionBack = "back" )
const (
// TypeModelTransformation is the task type for model transformations
TypeModelTransformation = "model:transformation"
)
Variables ¶
var ( // ErrModelConfigNotFound is returned when model configuration is not found ErrModelConfigNotFound = errors.New("model configuration not found") // ErrDependenciesNotSatisfied is returned when dependencies are not satisfied ErrDependenciesNotSatisfied = errors.New("dependencies not satisfied") // ErrModelIDNotFound is returned when model_id is not found in payload ErrModelIDNotFound = errors.New("model_id not found in payload") // ErrScanTypeNotFound is returned when scan_type is not found in payload ErrScanTypeNotFound = errors.New("scan_type not found in payload") ErrCacheManagerUnavailable = errors.New("cache manager not available") )
Functions ¶
This section is empty.
Types ¶
type Executor ¶
type Executor interface {
Execute(ctx context.Context, taskCtx interface{}) error
Validate(ctx context.Context, taskCtx interface{}) error
UpdateBounds(ctx context.Context, modelID, scanType string) error
}
Executor defines the interface for task executors
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager manages task queuing
func NewQueueManager ¶
func NewQueueManager(redisOpt *asynq.RedisClientOpt) *QueueManager
NewQueueManager creates a new queue manager
func (*QueueManager) EnqueueTransformation ¶
func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error
EnqueueTransformation enqueues a transformation task
func (*QueueManager) IsTaskPendingOrRunning ¶
func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)
IsTaskPendingOrRunning checks if a task is pending or running
type TaskContext ¶
type TaskContext struct {
Transformation models.Transformation
Position uint64
Interval uint64
StartTime time.Time
Variables map[string]interface{}
}
TaskContext contains all context needed for task execution
type TaskHandler ¶
type TaskHandler struct {
// contains filtered or unexported fields
}
TaskHandler handles task execution
func NewTaskHandler ¶
func NewTaskHandler( logger logrus.FieldLogger, chClient clickhouse.ClientInterface, adminService admin.Service, validator validation.Validator, modelExecutor Executor, transformations []models.Transformation, ) *TaskHandler
NewTaskHandler creates a new task handler
func (*TaskHandler) HandleExternalScan ¶ added in v0.0.18
HandleExternalScan handles external model scan tasks (both incremental and full)
func (*TaskHandler) HandleTransformation ¶
HandleTransformation handles transformation tasks
func (*TaskHandler) Routes ¶
func (h *TaskHandler) Routes() map[string]asynq.HandlerFunc
Routes returns the task handler routes for Asynq
type TaskPayload ¶
type TaskPayload struct {
ModelID string `json:"model_id"`
Position uint64 `json:"position"`
Interval uint64 `json:"interval"`
Direction string `json:"direction"` // DirectionForward or DirectionBack
EnqueuedAt time.Time `json:"enqueued_at"`
}
TaskPayload represents the payload for a transformation task
func (TaskPayload) QueueName ¶
func (p TaskPayload) QueueName() string
QueueName returns the queue name for this task payload
func (TaskPayload) UniqueID ¶
func (p TaskPayload) UniqueID() string
UniqueID returns a unique identifier for this task