Documentation
¶
Index ¶
- Constants
- type Queue
- func (q *Queue) Ack(ctx context.Context, taskID string) error
- func (q *Queue) CancelTask(ctx context.Context, taskID string) error
- func (q *Queue) Close() error
- func (q *Queue) CountTasks(ctx context.Context, filter driven.TaskFilter) (int64, error)
- func (q *Queue) Dequeue(ctx context.Context) (*domain.Task, error)
- func (q *Queue) DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)
- func (q *Queue) Enqueue(ctx context.Context, task *domain.Task) error
- func (q *Queue) EnqueueBatch(ctx context.Context, tasks []*domain.Task) error
- func (q *Queue) GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)
- func (q *Queue) GetTask(ctx context.Context, taskID string) (*domain.Task, error)
- func (q *Queue) ListTasks(ctx context.Context, filter driven.TaskFilter) ([]*domain.Task, error)
- func (q *Queue) Nack(ctx context.Context, taskID string, reason string) error
- func (q *Queue) Ping(ctx context.Context) error
- func (q *Queue) PurgeTasks(ctx context.Context, olderThanSeconds int) (int, error)
- func (q *Queue) Stats(ctx context.Context) (*driven.QueueStats, error)
Constants ¶
const CreateTasksTableSQL = `` /* 889-byte string literal not displayed */
SQL for creating the tasks table (to be used in migrations)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue implements TaskQueue using PostgreSQL with SKIP LOCKED for reliable task processing. This is the fallback queue when Redis is not available.
func NewQueue ¶
NewQueue creates a new PostgreSQL-backed task queue. Assumes the tasks table has been created via migrations.
func (*Queue) CancelTask ¶
CancelTask cancels a pending task
func (*Queue) CountTasks ¶
CountTasks returns the total number of tasks matching the filter
func (*Queue) Dequeue ¶
Dequeue retrieves the next available task using SELECT FOR UPDATE SKIP LOCKED. This ensures only one worker gets each task even with multiple workers.
func (*Queue) DequeueWithTimeout ¶
DequeueWithTimeout retrieves the next task, waiting up to timeout seconds
func (*Queue) EnqueueBatch ¶
EnqueueBatch adds multiple tasks atomically
func (*Queue) GetJobStats ¶
func (q *Queue) GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)
GetJobStats computes aggregated job statistics for a time period
func (*Queue) PurgeTasks ¶
PurgeTasks removes old completed/failed tasks