postgres

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const CreateTasksTableSQL = `` /* 889-byte string literal not displayed */

SQL for creating the tasks table (to be used in migrations)

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements TaskQueue using PostgreSQL with SKIP LOCKED for reliable task processing. This is the fallback queue when Redis is not available.

func NewQueue

func NewQueue(db *sql.DB) *Queue

NewQueue creates a new PostgreSQL-backed task queue. Assumes the tasks table has been created via migrations.

func (*Queue) Ack

func (q *Queue) Ack(ctx context.Context, taskID string) error

Ack marks a task as completed

func (*Queue) CancelTask

func (q *Queue) CancelTask(ctx context.Context, taskID string) error

CancelTask cancels a pending task

func (*Queue) Close

func (q *Queue) Close() error

Close is a no-op for the Postgres queue (db connection managed externally)

func (*Queue) CountTasks

func (q *Queue) CountTasks(ctx context.Context, filter driven.TaskFilter) (int64, error)

CountTasks returns the total number of tasks matching the filter

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context) (*domain.Task, error)

Dequeue retrieves the next available task using SELECT FOR UPDATE SKIP LOCKED. This ensures only one worker gets each task even with multiple workers.

func (*Queue) DequeueWithTimeout

func (q *Queue) DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)

DequeueWithTimeout retrieves the next task, waiting up to timeout seconds

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, task *domain.Task) error

Enqueue adds a task to the queue

func (*Queue) EnqueueBatch

func (q *Queue) EnqueueBatch(ctx context.Context, tasks []*domain.Task) error

EnqueueBatch adds multiple tasks atomically

func (*Queue) GetJobStats

func (q *Queue) GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)

GetJobStats computes aggregated job statistics for a time period

func (*Queue) GetTask

func (q *Queue) GetTask(ctx context.Context, taskID string) (*domain.Task, error)

GetTask retrieves a task by ID

func (*Queue) ListTasks

func (q *Queue) ListTasks(ctx context.Context, filter driven.TaskFilter) ([]*domain.Task, error)

ListTasks retrieves tasks matching the filter

func (*Queue) Nack

func (q *Queue) Nack(ctx context.Context, taskID string, reason string) error

Nack marks a task as failed, potentially scheduling a retry

func (*Queue) Ping

func (q *Queue) Ping(ctx context.Context) error

Ping checks database connectivity

func (*Queue) PurgeTasks

func (q *Queue) PurgeTasks(ctx context.Context, olderThanSeconds int) (int, error)

PurgeTasks removes old completed/failed tasks

func (*Queue) Stats

func (q *Queue) Stats(ctx context.Context) (*driven.QueueStats, error)

Stats returns queue statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL