redis

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements TaskQueue using Redis Streams. Redis Streams provide reliable message queuing with consumer groups, automatic acknowledgment tracking, and dead letter handling.

func NewQueue

func NewQueue(client *redis.Client, consumerName string) (*Queue, error)

NewQueue creates a new Redis-backed task queue. The consumerName should be unique per worker instance (e.g., hostname + PID).

func (*Queue) Ack

func (q *Queue) Ack(ctx context.Context, taskID string) error

Ack acknowledges successful completion of a task.

func (*Queue) CancelTask

func (q *Queue) CancelTask(ctx context.Context, taskID string) error

CancelTask marks a pending task as cancelled.

func (*Queue) Close

func (q *Queue) Close() error

Close cleans up resources.

func (*Queue) CountTasks

func (q *Queue) CountTasks(ctx context.Context, filter driven.TaskFilter) (int64, error)

CountTasks returns the total number of tasks matching the filter Note: Redis implementation scans all tasks - for large datasets, consider PostgreSQL

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context) (*domain.Task, error)

Dequeue retrieves the next available task for processing. This blocks until a task is available or context is cancelled.

func (*Queue) DequeueWithTimeout

func (q *Queue) DequeueWithTimeout(ctx context.Context, timeout int) (*domain.Task, error)

DequeueWithTimeout retrieves the next available task, waiting up to timeout seconds.

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, task *domain.Task) error

Enqueue adds a task to the queue for processing.

func (*Queue) EnqueueBatch

func (q *Queue) EnqueueBatch(ctx context.Context, tasks []*domain.Task) error

EnqueueBatch adds multiple tasks to the queue atomically.

func (*Queue) GetJobStats

func (q *Queue) GetJobStats(ctx context.Context, teamID string, period domain.AnalyticsPeriod) (*domain.JobStats, error)

GetJobStats computes aggregated job statistics for a time period Note: Redis implementation scans all tasks - for large datasets, consider PostgreSQL

func (*Queue) GetTask

func (q *Queue) GetTask(ctx context.Context, taskID string) (*domain.Task, error)

GetTask retrieves a task by ID.

func (*Queue) ListTasks

func (q *Queue) ListTasks(ctx context.Context, filter driven.TaskFilter) ([]*domain.Task, error)

ListTasks retrieves tasks matching the filter criteria. Note: This is less efficient in Redis than Postgres for complex queries.

func (*Queue) Nack

func (q *Queue) Nack(ctx context.Context, taskID string, reason string) error

Nack indicates task processing failed and should be retried.

func (*Queue) Ping

func (q *Queue) Ping(ctx context.Context) error

Ping checks if the queue backend is healthy.

func (*Queue) PurgeTasks

func (q *Queue) PurgeTasks(ctx context.Context, olderThanSeconds int) (int, error)

PurgeTasks removes completed/failed tasks older than the specified age.

func (*Queue) Stats

func (q *Queue) Stats(ctx context.Context) (*driven.QueueStats, error)

Stats returns queue statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL