Documentation
¶
Overview ¶
Package queuemanager provides high-level task queue management operations. It implements the TaskQueueManager interface and coordinates task storage, metrics tracking, and payload validation.
Index ¶
- type TaskQueueManager
- func (m *TaskQueueManager) AddTaskToQueue(ctx context.Context, task *entity.Task) error
- func (m *TaskQueueManager) AsyncAddTaskToQueue(ctx context.Context, task *entity.Task)
- func (m *TaskQueueManager) CancelTask(ctx context.Context, taskID uuid.UUID) error
- func (m *TaskQueueManager) GetTask(ctx context.Context, taskID uuid.UUID) (*entity.Task, error)
- func (m *TaskQueueManager) GetTasks(ctx context.Context, filter *dbentity.GetTasksFilter, limit int64) ([]*entity.Task, error)
- func (m *TaskQueueManager) ResetAttempts(ctx context.Context, taskID uuid.UUID) error
- func (m *TaskQueueManager) WaitAsyncEnqueues()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type TaskQueueManager ¶
type TaskQueueManager struct {
// contains filtered or unexported fields
}
TaskQueueManager provides a high-level API for managing tasks in the queue. It combines task creation and storage operations in a single interface.
func NewTaskQueueManager ¶
func NewTaskQueueManager(taskStorage storages.Task) *TaskQueueManager
NewTaskQueueManager creates a new TaskQueueManager instance with the specified task storage.
func (*TaskQueueManager) AddTaskToQueue ¶
AddTaskToQueue adds a task to the queue and returns an error if the operation fails.
func (*TaskQueueManager) AsyncAddTaskToQueue ¶
func (m *TaskQueueManager) AsyncAddTaskToQueue(ctx context.Context, task *entity.Task)
AsyncAddTaskToQueue adds a task to the queue asynchronously without waiting for completion.
The goroutine outlives the caller's stack, so any *sqlx.Tx carried in ctx is stripped before dispatch (dbtx.WithoutTx itself logs WARN if there was a tx to strip) — enrolling the async write in the caller's tx would race against the caller's Commit/Rollback.
In-flight goroutines are tracked by an internal WaitGroup that WaitAsyncEnqueues drains during graceful shutdown (Goque.Stop()). Callers using the manager outside Goque must call WaitAsyncEnqueues themselves before tearing down the underlying *sqlx.DB.
func (*TaskQueueManager) CancelTask ¶ added in v0.5.0
CancelTask marks a non-terminal task as canceled.
func (*TaskQueueManager) GetTasks ¶
func (m *TaskQueueManager) GetTasks(ctx context.Context, filter *dbentity.GetTasksFilter, limit int64) ([]*entity.Task, error)
GetTasks retrieves tasks from the queue based on the provided filter criteria. The limit parameter controls the maximum number of tasks to return.
func (*TaskQueueManager) ResetAttempts ¶
ResetAttempts resets the retry attempts counter for a task and sets its status back to new. This allows a failed task to be retried from the beginning.
func (*TaskQueueManager) WaitAsyncEnqueues ¶ added in v0.8.3
func (m *TaskQueueManager) WaitAsyncEnqueues()
WaitAsyncEnqueues blocks until every in-flight goroutine spawned by AsyncAddTaskToQueue has returned. Goque.Stop() calls this automatically. Direct users of TaskQueueManager (outside the Goque facade) should call it before closing the underlying *sqlx.DB to avoid "sql: database is closed" errors from late async writes.