ctq

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultCoordinatorURL = "http://127.0.0.1:4444"
View Source
const InsertTaskExecutionFmt = `` /* 153-byte string literal not displayed */
View Source
const UpdateTaskStatusFmt = `` /* 148-byte string literal not displayed */

Variables

This section is empty.

Functions

func RunCLI

func RunCLI()

func RunServices

func RunServices(asCoordinator bool)

Types

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator manages the task queue and provides API endpoints

func NewCoordinator

func NewCoordinator(db *DB, httpAddr string) *Coordinator

func (*Coordinator) Start

func (c *Coordinator) Start() error

Start begins the coordinator service

type DB

type DB struct {
	*alfredo.DatabaseStruct
}

func InitDB

func InitDB(dbPath string) (*DB, error)

func (*DB) AcquireLock

func (db *DB) AcquireLock(taskID int64, workerID string, lockDuration time.Duration) (bool, error)

func (*DB) AddTask

func (db *DB) AddTask(task *Task) error

func (*DB) CleanupExpiredLocks

func (db *DB) CleanupExpiredLocks() error

func (*DB) CreateExecution

func (db *DB) CreateExecution(taskID int64, workerID string) (int64, error)

func (*DB) DeleteTask

func (db *DB) DeleteTask(name string) error

func (*DB) EnableTask

func (db *DB) EnableTask(name string, enabled bool) error

func (*DB) GetNextTask

func (db *DB) GetNextTask() (*TaskWithExecution, error)

func (*DB) GetTask

func (db *DB) GetTask(name string) (*Task, error)

func (*DB) IsQueuePaused

func (db *DB) IsQueuePaused() bool

func (*DB) ListTasks

func (db *DB) ListTasks() ([]Task, error)

func (*DB) RecordMetric

func (db *DB) RecordMetric(taskID int64, durationMs int64, status string) error

func (*DB) RefreshTask

func (db *DB) RefreshTask(name string) error

RefreshTask clears execution history for a task so it can run again This is useful for one-shot tasks that need to be re-run

func (*DB) ReleaseLock

func (db *DB) ReleaseLock(taskID int64, workerID string) error

func (*DB) SetQueuePaused

func (db *DB) SetQueuePaused(paused bool, pausedBy string) error

func (*DB) UpdateExecution

func (db *DB) UpdateExecution(executionID int64, status string, errorMsg *string, durationMs int64) error

type Task

type Task struct {
	ID              int64             `json:"id"`
	Name            string            `json:"name"`
	Enabled         bool              `json:"enabled"`
	Priority        int               `json:"priority"`
	CooldownSeconds int               `json:"cooldown_seconds"`
	MaxRetries      int               `json:"max_retries"`
	Requeue         bool              `json:"requeue"`
	TaskType        string            `json:"task_type"`
	Args            string            `json:"args"` // JSON string
	CreatedAt       alfredo.EpochTime `json:"created_at"`
	UpdatedAt       alfredo.EpochTime `json:"updated_at"`
}

Task represents a task definition

func (*Task) IsEnabled

func (t *Task) IsEnabled() bool

func (*Task) ShouldRequeue

func (t *Task) ShouldRequeue() bool

type TaskExecution

type TaskExecution struct {
	ID           int64      `json:"id"`
	TaskID       int64      `json:"task_id"`
	StartedAt    *time.Time `json:"started_at"`
	FinishedAt   *time.Time `json:"finished_at"`
	Status       string     `json:"status"`
	ErrorMessage *string    `json:"error_message"`
	RetryCount   int        `json:"retry_count"`
	WorkerID     *string    `json:"worker_id"`
	DurationMs   *int64     `json:"duration_ms"`
}

TaskExecution represents a single execution of a task

type TaskExecutor

type TaskExecutor struct {
	// contains filtered or unexported fields
}

TaskExecutor executes tasks based on their type

func NewTaskExecutor

func NewTaskExecutor(db *DB, workerID string) *TaskExecutor

func (*TaskExecutor) Execute

func (te *TaskExecutor) Execute(twe *TaskWithExecution) error

Execute runs a task and records the results

type TaskWithExecution

type TaskWithExecution struct {
	Task          Task
	LastExecution *TaskExecution
}

TaskWithExecution combines task and execution info

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes tasks from the queue

func NewWorker

func NewWorker(db *DB, workerID string) *Worker

func (*Worker) Start

func (w *Worker) Start() error

Start begins the worker loop

func (*Worker) Stop

func (w *Worker) Stop()

Stop signals the worker to stop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL