Documentation
¶
Index ¶
- Constants
- func RunCLI()
- func RunServices(asCoordinator bool)
- type Coordinator
- type DB
- func (db *DB) AcquireLock(taskID int64, workerID string, lockDuration time.Duration) (bool, error)
- func (db *DB) AddTask(task *Task) error
- func (db *DB) CleanupExpiredLocks() error
- func (db *DB) CreateExecution(taskID int64, workerID string) (int64, error)
- func (db *DB) DeleteTask(name string) error
- func (db *DB) EnableTask(name string, enabled bool) error
- func (db *DB) GetNextTask() (*TaskWithExecution, error)
- func (db *DB) GetTask(name string) (*Task, error)
- func (db *DB) IsQueuePaused() bool
- func (db *DB) ListTasks() ([]Task, error)
- func (db *DB) RecordMetric(taskID int64, durationMs int64, status string) error
- func (db *DB) RefreshTask(name string) error
- func (db *DB) ReleaseLock(taskID int64, workerID string) error
- func (db *DB) SetQueuePaused(paused bool, pausedBy string) error
- func (db *DB) UpdateExecution(executionID int64, status string, errorMsg *string, durationMs int64) error
- type Task
- type TaskExecution
- type TaskExecutor
- type TaskWithExecution
- type Worker
Constants ¶
View Source
const DefaultCoordinatorURL = "http://127.0.0.1:4444"
View Source
const InsertTaskExecutionFmt = `` /* 153-byte string literal not displayed */
View Source
const UpdateTaskStatusFmt = `` /* 148-byte string literal not displayed */
Variables ¶
This section is empty.
Functions ¶
func RunServices ¶
func RunServices(asCoordinator bool)
Types ¶
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator manages the task queue and provides API endpoints
func NewCoordinator ¶
func NewCoordinator(db *DB, httpAddr string) *Coordinator
func (*Coordinator) Start ¶
func (c *Coordinator) Start() error
Start begins the coordinator service
type DB ¶
type DB struct {
*alfredo.DatabaseStruct
}
func (*DB) AcquireLock ¶
func (*DB) CleanupExpiredLocks ¶
func (*DB) CreateExecution ¶
func (*DB) DeleteTask ¶
func (*DB) GetNextTask ¶
func (db *DB) GetNextTask() (*TaskWithExecution, error)
func (*DB) IsQueuePaused ¶
func (*DB) RecordMetric ¶
func (*DB) RefreshTask ¶
RefreshTask clears execution history for a task so it can run again This is useful for one-shot tasks that need to be re-run
type Task ¶
type Task struct {
ID int64 `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
Priority int `json:"priority"`
CooldownSeconds int `json:"cooldown_seconds"`
MaxRetries int `json:"max_retries"`
Requeue bool `json:"requeue"`
TaskType string `json:"task_type"`
Args string `json:"args"` // JSON string
CreatedAt alfredo.EpochTime `json:"created_at"`
UpdatedAt alfredo.EpochTime `json:"updated_at"`
}
Task represents a task definition
func (*Task) ShouldRequeue ¶
type TaskExecution ¶
type TaskExecution struct {
ID int64 `json:"id"`
TaskID int64 `json:"task_id"`
StartedAt *time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at"`
Status string `json:"status"`
ErrorMessage *string `json:"error_message"`
RetryCount int `json:"retry_count"`
WorkerID *string `json:"worker_id"`
DurationMs *int64 `json:"duration_ms"`
}
TaskExecution represents a single execution of a task
type TaskExecutor ¶
type TaskExecutor struct {
// contains filtered or unexported fields
}
TaskExecutor executes tasks based on their type
func NewTaskExecutor ¶
func NewTaskExecutor(db *DB, workerID string) *TaskExecutor
func (*TaskExecutor) Execute ¶
func (te *TaskExecutor) Execute(twe *TaskWithExecution) error
Execute runs a task and records the results
type TaskWithExecution ¶
type TaskWithExecution struct {
Task Task
LastExecution *TaskExecution
}
TaskWithExecution combines task and execution info
Click to show internal directories.
Click to hide internal directories.