Documentation
¶
Index ¶
- type Queue
- func (q *Queue) Ack(ctx context.Context, taskID string) error
- func (q *Queue) CancelTask(ctx context.Context, taskID string) error
- func (q *Queue) Close() error
- func (q *Queue) CountTasks(ctx context.Context, filter driven.TaskFilter) (int64, error)
- func (q *Queue) Dequeue(ctx context.Context) (*domain.Task, error)
- func (q *Queue) DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)
- func (q *Queue) Enqueue(ctx context.Context, task *domain.Task) error
- func (q *Queue) EnqueueBatch(ctx context.Context, tasks []*domain.Task) error
- func (q *Queue) GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)
- func (q *Queue) GetTask(ctx context.Context, taskID string) (*domain.Task, error)
- func (q *Queue) ListTasks(ctx context.Context, filter driven.TaskFilter) ([]*domain.Task, error)
- func (q *Queue) Nack(ctx context.Context, taskID string, reason string) error
- func (q *Queue) Ping(ctx context.Context) error
- func (q *Queue) PurgeTasks(ctx context.Context, olderThanSeconds int) (int, error)
- func (q *Queue) Stats(ctx context.Context) (*driven.QueueStats, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue implements TaskQueue using Redis Streams. Redis Streams provide reliable message queuing with consumer groups, automatic acknowledgment tracking, and dead letter handling.
func NewQueue ¶
NewQueue creates a new Redis-backed task queue. The consumerName should be unique per worker instance (e.g., hostname + PID).
func (*Queue) CancelTask ¶
CancelTask marks a pending task as cancelled.
func (*Queue) CountTasks ¶
CountTasks returns the total number of tasks matching the filter Note: Redis implementation scans all tasks - for large datasets, consider PostgreSQL
func (*Queue) Dequeue ¶
Dequeue retrieves the next available task for processing. This blocks until a task is available or context is cancelled.
func (*Queue) DequeueWithTimeout ¶
DequeueWithTimeout retrieves the next available task, waiting up to timeout seconds.
func (*Queue) EnqueueBatch ¶
EnqueueBatch adds multiple tasks to the queue atomically.
func (*Queue) GetJobStats ¶
func (q *Queue) GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)
GetJobStats computes aggregated job statistics for a time period Note: Redis implementation scans all tasks - for large datasets, consider PostgreSQL
func (*Queue) ListTasks ¶
ListTasks retrieves tasks matching the filter criteria. Note: This is less efficient in Redis than Postgres for complex queries.
func (*Queue) PurgeTasks ¶
PurgeTasks removes completed/failed tasks older than the specified age.