Documentation
¶
Overview ¶
Package tasks provides task handling and execution functionality
Index ¶
- Constants
- Variables
- type Executor
- type QueueManager
- func (q *QueueManager) Close() error
- func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error
- func (q *QueueManager) GetQueueStats(queueName string) (*asynq.QueueInfo, error)
- func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)
- func (q *QueueManager) WasRecentlyCompleted(task TaskPayload, within time.Duration) (bool, error)
- type TaskContext
- type TaskHandler
- type TaskPayload
Constants ¶
const (
// TypeModelTransformation is the task type for model transformations
TypeModelTransformation = "model:transformation"
)
Variables ¶
var ( // ErrModelConfigNotFound is returned when model configuration is not found ErrModelConfigNotFound = errors.New("model configuration not found") // ErrDependenciesNotSatisfied is returned when dependencies are not satisfied ErrDependenciesNotSatisfied = errors.New("dependencies not satisfied") )
Functions ¶
This section is empty.
Types ¶
type Executor ¶
type Executor interface {
Execute(ctx context.Context, taskCtx interface{}) error
Validate(ctx context.Context, taskCtx interface{}) error
}
Executor defines the interface for task executors
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager manages task queuing
func NewQueueManager ¶
func NewQueueManager(redisOpt *asynq.RedisClientOpt) *QueueManager
NewQueueManager creates a new queue manager
func (*QueueManager) EnqueueTransformation ¶
func (q *QueueManager) EnqueueTransformation(payload TaskPayload, opts ...asynq.Option) error
EnqueueTransformation enqueues a transformation task
func (*QueueManager) GetQueueStats ¶
func (q *QueueManager) GetQueueStats(queueName string) (*asynq.QueueInfo, error)
GetQueueStats returns queue statistics
func (*QueueManager) IsTaskPendingOrRunning ¶
func (q *QueueManager) IsTaskPendingOrRunning(task TaskPayload) (bool, error)
IsTaskPendingOrRunning checks if a task is pending or running
func (*QueueManager) WasRecentlyCompleted ¶
func (q *QueueManager) WasRecentlyCompleted(task TaskPayload, within time.Duration) (bool, error)
WasRecentlyCompleted checks if a task was recently completed
type TaskContext ¶
type TaskContext struct {
Transformation models.Transformation
Position uint64
Interval uint64
StartTime time.Time
Variables map[string]interface{}
}
TaskContext contains all context needed for task execution
type TaskHandler ¶
type TaskHandler struct {
// contains filtered or unexported fields
}
TaskHandler handles task execution
func NewTaskHandler ¶
func NewTaskHandler( logger *logrus.Logger, chClient clickhouse.ClientInterface, adminService *admin.Service, validator *validation.DependencyValidator, modelExecutor Executor, transformations []models.Transformation, ) *TaskHandler
NewTaskHandler creates a new task handler
func (*TaskHandler) HandleTransformation ¶
HandleTransformation handles transformation tasks
func (*TaskHandler) Routes ¶
func (h *TaskHandler) Routes() map[string]asynq.HandlerFunc
Routes returns the task handler routes for Asynq
type TaskPayload ¶
type TaskPayload struct {
ModelID string `json:"model_id"`
Position uint64 `json:"position"`
Interval uint64 `json:"interval"`
EnqueuedAt time.Time `json:"enqueued_at"`
}
TaskPayload represents the payload for a transformation task
func (TaskPayload) QueueName ¶
func (p TaskPayload) QueueName() string
QueueName returns the queue name for this task payload
func (TaskPayload) UniqueID ¶
func (p TaskPayload) UniqueID() string
UniqueID returns a unique identifier for this task