task

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package task provides storage operations for task management in the queue system.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDuplicateTask is returned when attempting to insert a task with a duplicate external ID.
	ErrDuplicateTask = errors.New("task already exists")
	// ErrInvalidPayloadFormat is returned when the task payload is not valid JSON.
	ErrInvalidPayloadFormat = errors.New("payload format is invalid. should be json")
)

Functions

This section is empty.

Types

type GetTasksFilter

type GetTasksFilter struct {
	TaskType         *entity.TaskType
	Status           *entity.TaskStatus
	UpdatedAtTimeAgo *time.Duration
}

GetTasksFilter defines filtering criteria for task queries.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage handles database operations for tasks.

func NewStorage

func NewStorage(db *sqlx.DB) *Storage

NewStorage creates a new Storage instance with the provided database connection.

func (*Storage) AddTask

func (s *Storage) AddTask(ctx context.Context, task *entity.Task) error

AddTask inserts a new task into the database.

func (*Storage) CureTasks

func (s *Storage) CureTasks(ctx context.Context, taskType entity.TaskType, unhealthStatuses []entity.TaskStatus, updatedAtTimeAgo time.Duration, comment string) ([]*entity.Task, error)

CureTasks updates stuck tasks to error status for retry.

func (*Storage) DeleteTasks

func (s *Storage) DeleteTasks(
	ctx context.Context,
	taskType entity.TaskType,
	statuses []entity.TaskStatus,
	updatedAtTimeAgo time.Duration,
) ([]*entity.Task, error)

DeleteTasks removes tasks with specified statuses that haven't been updated within the given time period.

func (*Storage) GetTask

func (s *Storage) GetTask(ctx context.Context, id uuid.UUID) (*entity.Task, error)

GetTask retrieves a task from the database by its ID.

func (*Storage) GetTasks

func (s *Storage) GetTasks(ctx context.Context, filter *GetTasksFilter, limit int64) ([]*entity.Task, error)

GetTasks retrieves tasks matching the filter criteria with a specified limit.

func (*Storage) GetTasksForProcessing

func (s *Storage) GetTasksForProcessing(ctx context.Context, taskType entity.TaskType, limit int64) ([]*entity.Task, error)

GetTasksForProcessing retrieves and locks tasks ready for processing, updating their status to pending.

func (*Storage) UpdateTask

func (s *Storage) UpdateTask(ctx context.Context, taskID uuid.UUID, task *entity.Task) error

UpdateTask updates an existing task in the database with the provided data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL