queuemanager

package
v0.8.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package queuemanager provides high-level task queue management operations. It implements the TaskQueueManager interface and coordinates task storage, metrics tracking, and payload validation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type TaskQueueManager

type TaskQueueManager struct {
	// contains filtered or unexported fields
}

TaskQueueManager provides a high-level API for managing tasks in the queue. It combines task creation and storage operations in a single interface.

func NewTaskQueueManager

func NewTaskQueueManager(taskStorage storages.Task) *TaskQueueManager

NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.

func (*TaskQueueManager) AddTaskToQueue

func (m *TaskQueueManager) AddTaskToQueue(ctx context.Context, task *entity.Task) error

AddTaskToQueue adds a task to the queue and returns an error if the operation fails.

func (*TaskQueueManager) AsyncAddTaskToQueue

func (m *TaskQueueManager) AsyncAddTaskToQueue(ctx context.Context, task *entity.Task)

AsyncAddTaskToQueue adds a task to the queue asynchronously without waiting for completion.

The goroutine outlives the caller's stack, so any *sqlx.Tx carried in ctx is stripped before dispatch (dbtx.WithoutTx itself logs WARN if there was a tx to strip) — enrolling the async write in the caller's tx would race against the caller's Commit/Rollback.

In-flight goroutines are tracked by an internal WaitGroup that WaitAsyncEnqueues drains during graceful shutdown (Goque.Stop()). Callers using the manager outside Goque must call WaitAsyncEnqueues themselves before tearing down the underlying *sqlx.DB.

func (*TaskQueueManager) CancelTask added in v0.5.0

func (m *TaskQueueManager) CancelTask(ctx context.Context, taskID uuid.UUID) error

CancelTask marks a non-terminal task as canceled.

func (*TaskQueueManager) GetTask

func (m *TaskQueueManager) GetTask(ctx context.Context, taskID uuid.UUID) (*entity.Task, error)

GetTask retrieves a single task by its ID from the queue.

func (*TaskQueueManager) GetTasks

func (m *TaskQueueManager) GetTasks(ctx context.Context, filter *dbentity.GetTasksFilter, limit int64) ([]*entity.Task, error)

GetTasks retrieves tasks from the queue based on the provided filter criteria. The limit parameter controls the maximum number of tasks to return.

func (*TaskQueueManager) ResetAttempts

func (m *TaskQueueManager) ResetAttempts(ctx context.Context, taskID uuid.UUID) error

ResetAttempts resets the retry attempts counter for a task and sets its status back to new. This allows a failed task to be retried from the beginning.

func (*TaskQueueManager) WaitAsyncEnqueues added in v0.8.3

func (m *TaskQueueManager) WaitAsyncEnqueues()

WaitAsyncEnqueues blocks until every in-flight goroutine spawned by AsyncAddTaskToQueue has returned. Goque.Stop() calls this automatically. Direct users of TaskQueueManager (outside the Goque facade) should call it before closing the underlying *sqlx.DB to avoid "sql: database is closed" errors from late async writes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL